回复: 回复: Re: 怎样从flink执行计划json生成StreamGraph?

2022-12-01 Thread 仙路尽头谁为峰
怎么指定? 文档好像没有。sql的算子语义和DataStream不是一对一的,现在只看到Sink可以配置并行度。

从 Windows 版邮件发送

发件人: casel.chen
发送时间: 2022年12月2日 12:45
收件人: user-zh@flink.apache.org
主题: Re:回复: Re: 怎样从flink执行计划json生成StreamGraph?

sql作业最终也会转换成stream api生成jobgraph,因此同样可以支持修改每个算子并行度

















在 2022-11-30 11:24:50,"仙路尽头谁为峰"  写道:
>Sql作业好像不支持修改每个算子并行度吧,修改并行度需要从头开始重新生成JobGraph提交作业。
>Json主要是贴到Plan Visualizer 开发和调试用。
>https://flink.apache.org/visualizer/
>从 Windows 版邮件发送
>
>发件人: yidan zhao
>发送时间: 2022年11月30日 10:12
>收件人: user-zh@flink.apache.org
>主题: Re: Re: 怎样从flink执行计划json生成StreamGraph?
>
>好吧,sql我具体不了解,我用的stream api比较多,我了解是stream
>api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。
>
>casel.chen  于2022年11月30日周三 00:16写道:
>>
>> 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-11-29 10:07:40,"yidan zhao"  写道:
>> >并不需要从执行计划json生成streamGraph呀~
>> >streamGraph提交之前直接转jobGraph。
>> >
>> >casel.chen  于2022年11月28日周一 08:53写道:
>> >>
>> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教
>



Re: Re: flink sql作业无缝升级问题

2022-12-01 Thread yidan zhao
通过savepoint方式先停止作业可以,不停止,你要考虑是否你的作业是否能做到重复处理部分数据不影响准确性。

先做savepoint但不停止作业,新作业启动后,新旧作业是消费的数据是重复的,不会因为相同group就不重复。
因为kafka的消费是2个模式,一个是组模式,还有一个是不受到组约束的。Flink采用的是后者。

我说的那个方法是在kafka后边加一个filter,filter的参数就是start和end,根据start和end过滤数据。
而且这个start和end需要可动态配置,就是不重启作业能配置才行。
这样就可以先启动新作业,并设置其从未来某个时间点(ts)开始消费,并设置旧作业消费到ts后停止。这样等待到ts到达,并确认旧任务消费完成ts之前的数据,停止旧作业就好了。

casel.chen  于2022年12月2日周五 12:42写道:
>
>
>
>
>
>
>
> 拿kafka source作业为例,新老作业使用相同的consumer 
> group,老作业先做savepoint,完了在老作业还在运行的同时启动新作业从刚才的savepoint恢复会有问题么?
> 如何设置一个流量开关让新作业“准备”好再打开流量呢?有没有具体实操的例子?还是说需要自己修改flink源码,具体要看哪一个类方法?
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-11-30 20:08:44,"Jiangang Liu"  写道:
> >Flink目前无法做到无缝升级,需要走stop-with-savepoint、start
> >job的流程,但是在这之间可以做一些优化来缩短恢复时间。比如,把新作业先启动起来,申请好资源,同时停掉老作业,将做好的savepoint用来触发新作业的执行。
> >
> >casel.chen  于2022年11月29日周二 08:38写道:
> >
> >> 线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb
> >> 数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢?
> >> 常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停止老作业?kafka
> >> group使用同一个,作业启动从group-offsets开始可以吗?另外,如果是有大状态作业又要如何无缝升级?


Re:回复: Re: 怎样从flink执行计划json生成StreamGraph?

2022-12-01 Thread casel.chen
sql作业最终也会转换成stream api生成jobgraph,因此同样可以支持修改每个算子并行度

















在 2022-11-30 11:24:50,"仙路尽头谁为峰"  写道:
>Sql作业好像不支持修改每个算子并行度吧,修改并行度需要从头开始重新生成JobGraph提交作业。
>Json主要是贴到Plan Visualizer 开发和调试用。
>https://flink.apache.org/visualizer/
>从 Windows 版邮件发送
>
>发件人: yidan zhao
>发送时间: 2022年11月30日 10:12
>收件人: user-zh@flink.apache.org
>主题: Re: Re: 怎样从flink执行计划json生成StreamGraph?
>
>好吧,sql我具体不了解,我用的stream api比较多,我了解是stream
>api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。
>
>casel.chen  于2022年11月30日周三 00:16写道:
>>
>> 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-11-29 10:07:40,"yidan zhao"  写道:
>> >并不需要从执行计划json生成streamGraph呀~
>> >streamGraph提交之前直接转jobGraph。
>> >
>> >casel.chen  于2022年11月28日周一 08:53写道:
>> >>
>> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教
>


Re:Re: flink sql作业无缝升级问题

2022-12-01 Thread casel.chen






拿kafka source作业为例,新老作业使用相同的consumer 
group,老作业先做savepoint,完了在老作业还在运行的同时启动新作业从刚才的savepoint恢复会有问题么?
如何设置一个流量开关让新作业“准备”好再打开流量呢?有没有具体实操的例子?还是说需要自己修改flink源码,具体要看哪一个类方法?











在 2022-11-30 20:08:44,"Jiangang Liu"  写道:
>Flink目前无法做到无缝升级,需要走stop-with-savepoint、start
>job的流程,但是在这之间可以做一些优化来缩短恢复时间。比如,把新作业先启动起来,申请好资源,同时停掉老作业,将做好的savepoint用来触发新作业的执行。
>
>casel.chen  于2022年11月29日周二 08:38写道:
>
>> 线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb
>> 数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢?
>> 常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停止老作业?kafka
>> group使用同一个,作业启动从group-offsets开始可以吗?另外,如果是有大状态作业又要如何无缝升级?


Re: (Co)ProcessFunction vs Keyed(Co)ProcessFunction

2022-12-01 Thread Gen Luo
Hi Salva,

I suppose what you are missing is that, the timers are stored in the keyed
state, so you may only register timers when using KeyedCoProcessFunction.
If you try to register a timer in the CoProcessFunction, you'll get an
UnsupportedOperationException with the message "Setting timers is only
supported on a keyed streams.".

For the first question, as far as I know, there should be no difference
that matters.

Salva Alcántara  于 2022年12月1日周四 18:27写道:

> The current docs claim [1]:
>
> "KeyedProcessFunction, as an extension of ProcessFunction, gives access
> to the key of timers in its onTimer(...) method."
>
> So, from what it's worth, it seems that if one does not need to query the
> current key, wich within a `Keyed(Co)ProcessFunction` can be done like this
>
> ```
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector
> out) throws Exception {
> K key = ctx.getCurrentKey();
> // ...
> }
> ```
>
> , they are mostly interchangeable (???). For instance, if I wanted to
> replace an existing `RichCoFlatMapFunction` that uses keyed
> state with a process function, should I use `CoProcessFunction OUT>`  or `KeyedCoProcessFunction`? The type parameters
> for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`.
> The current docs also claim that `CoProcessFunction` can work with keyed
> state [1]:
>
> "The ProcessFunction can be thought of as a FlatMapFunction with access to
> keyed state and timers."
>
> Also:
>
> "If you want to access keyed state and timers you have to apply the
> ProcessFunction on a keyed stream: `stream.keyBy(...).process(new
> MyProcessFunction());`."
>
> so it seems that the natural replacement choice should be the
> `CoProcessFunction` unless I'm missing something that strictly requires the
> usage of the keyed version (???). However, I've recently commented in [2]
> and I got a reply saying that I should go with `KeyedCoProcessFunction` if
> I'm using keyed state or timers which is a bit confusing to me. In summary:
>
> - Does replacing a `CoFlatMapFunction` with a `CoProcessFunction` preserve
> the original behaviour in presence of keyed state?
> - If timers are used later on, does it make any difference if I use
> `CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I
> don't need to explicitly access the current key for anything
> (`ctx.getCurrentKey()`).
>
> Thanks in advance!
>
> Salva
>
> ---
>
> References
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction
> [2]
> https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885
>
>


Re:FLINK postgresql CDC 报语法错误

2022-12-01 Thread bmw
是查询的时候报错,建表是成功的。
select * from postgres_cdc_test 
错误:
[ERROR] Could not execute SQL statement. Reason:
org.postgresql.util.PSQLException: ERROR: syntax error


















在 2022-12-02 10:09:37,"bmw"  写道:

HI flink postgresql CDC  flink1.12  ,postgresql:9.6.21 报错:
CREATE TABLE postgres_cdc_test (
  id INT,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED 
) WITH (
  'connector' = 'postgres-cdc',   
  'hostname' = '192.168.1.101',
  'port' = '5432',  
  'username' = 'postgres',
  'password' = 'test', 
  'database-name' = 'test',
  'schema-name' = 'public',  
  'table-name' = 'test',   
  'debezium.slot.name' = 'customslotname',  
  'decoding.plugin.name' = 'pgoutput'
);


错误信息:



FLINK postgresql CDC 报语法错误

2022-12-01 Thread bmw
HI flink postgresql CDC  flink1.12  ,postgresql:9.6.21 报错:
CREATE TABLE postgres_cdc_test (
  id INT,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED 
) WITH (
  'connector' = 'postgres-cdc',   
  'hostname' = '192.168.1.101',
  'port' = '5432',  
  'username' = 'postgres',
  'password' = 'test', 
  'database-name' = 'test',
  'schema-name' = 'public',  
  'table-name' = 'test',   
  'debezium.slot.name' = 'customslotname',  
  'decoding.plugin.name' = 'pgoutput'
);


错误信息:



Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Andrew Otto
Ah, got it.  Thanks!

On Thu, Dec 1, 2022 at 11:34 AM Gyula Fóra  wrote:

> As I also mentioned in the email, this is on our roadmap for the operator
> but we have not implemented it yet because this feature only became
> available as of Flink 1.16.
>
> Ideally in the operator FlinkDeployment spec.flinkConfiguration section
> the user should be able to use env vars if this is added.
>
> Gyula
>
> On Thu, Dec 1, 2022 at 5:18 PM Andrew Otto  wrote:
>
>> > Andrew please see my previous response, that covers the secrets case.
>> > kubernetes.jobmanager.entrypoint.args: -D
>> datadog.secret.conf=$MY_SECRET_ENV
>>
>> This way^?  Ya that makes sense.  It'd be nice if there was a way to get
>> Secrets into the values used for rendering flink-conf.yaml too, so the
>> confs will be all in the same place.
>>
>>
>>
>>
>>
>> On Thu, Dec 1, 2022 at 9:30 AM Gyula Fóra  wrote:
>>
>>> Andrew please see my previous response, that covers the secrets case.
>>>
>>> Gyula
>>>
>>> On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:
>>>
 > several failures to write into $FLINK_HOME/conf/.
 I'm working on
 
 building Flink and flink-kubernetes-operator images for the Wikimedia
 Foundation, and I found this strange as well.  It makes sense in a docker /
 docker-compose only environment, but in k8s where you have ConfigMap
 responsible for flink-conf.yaml, and (also logs all going to the console,
 not FLINK_HOME/log), I'd prefer if the image was not modified by the
 ENTRYPOINT.

 I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
 
 provided by flink-docker is not really needed.  It seems to be written more
 for deployments outside of kubernetes.
  flink-kubernetes-operator never calls the built in subcommands (e.g.
 standalone-job), and always runs in 'pass-through' mode, just execing the
 args passed to it.  At WMF we build
  our own images, so I'm
 planning on removing all of the stuff in ENTRYPOINTs that mangles the
 image.  Anything that I might want to keep from docker-entrypoint.sh (like 
 enabling
 jemoalloc
 )
 I should be able to do in the Dockerfile at image creation time.

 >  want to set an API key as part of the flink-conf.yaml file, but we
 don't want it to be persisted in Kubernetes or in our version control
 I personally am still pretty green at k8s, but would using kubernetes
 Secrets
 
 work for your use case? I know we use them at WMF, but from a quick glance
 I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
 that renders flink-conf.yaml, but I feel like there should be a way.




 On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra 
 wrote:

> Hi Lucas!
>
> The Flink kubernetes integration itself is responsible for mounting
> the configmap and overwriting the entrypoint not the operator. Therefore
> this is not something we can easily change from the operator side. However
> I think we are looking at the problem from the wrong side and there may be
> a solution already :)
>
> Ideally what you want is ENV replacement in Flink configuration. This
> is not something that the Flink community has added yet unfortunately but
> we have it on our radar for the operator at least (
> https://issues.apache.org/jira/browse/FLINK-27491). It will probably
> be added in the next 1.4.0 version.
>
> This will be possible from Flink 1.16 which introduced a small feature
> that allows us to inject parameters to the kubernetes entrypoints:
> https://issues.apache.org/jira/browse/FLINK-29123
>
> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>
> While it's not implemented in the operator yet, you could try setting
> the following config in Flink 1.16.0:
> kubernetes.jobmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
> kubernetes.taskmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
>
> If you use this configuration together with the default native mode in
> the operator, it should work I believe.
>
> Please try and let me know!
> Gyula
>
> On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
> lucas.capare...@gympass.com> wrote:
>
>> Hello folks,
>>
>> Not sure if this is the best list for this, sorry if it isn't. I'd
>> appreciate some pointers :-)
>>
>> When 

How to use the local repositories Jar instead of pulling remote snapshots when building modules?

2022-12-01 Thread hjw
Hi, team.
Maven always pulls remote decency snapshot jar When I build the Flink 
Kubernetes module.
I also modified the Kubernetes module and the Flink client module that 
Kubernetes depends on. I hope to rely on my local repositories  instead of 
remote snapshots when executing the build or running unit test.
I Have tried to delete the _remote.repositories of relation jar, but it doesn't 
work.


Related information:
I have built  and installed  all Flink module to My local repositories  by 
maven command " mvn clean install -DskipTests -Dfast -Paliyun 
-Pskip-webui-build -T 1C".


Maven build Kubernetes Module command:
mvn clean test package -Paliyun


Here My Maven Setting :
  D:\software\maven\repo-flink
  
  
alimaven
aliyun maven
http://maven.aliyun.com/nexus/content/groups/public/
central


  

  aliyun
  

  aliyun
  http://maven.aliyun.com/nexus/content/groups/public/
  
false
  
  
false
never
  

  





--

Best,
Hjw

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Gyula Fóra
As I also mentioned in the email, this is on our roadmap for the operator
but we have not implemented it yet because this feature only became
available as of Flink 1.16.

Ideally in the operator FlinkDeployment spec.flinkConfiguration section the
user should be able to use env vars if this is added.

Gyula

On Thu, Dec 1, 2022 at 5:18 PM Andrew Otto  wrote:

> > Andrew please see my previous response, that covers the secrets case.
> > kubernetes.jobmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
>
> This way^?  Ya that makes sense.  It'd be nice if there was a way to get
> Secrets into the values used for rendering flink-conf.yaml too, so the
> confs will be all in the same place.
>
>
>
>
>
> On Thu, Dec 1, 2022 at 9:30 AM Gyula Fóra  wrote:
>
>> Andrew please see my previous response, that covers the secrets case.
>>
>> Gyula
>>
>> On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:
>>
>>> > several failures to write into $FLINK_HOME/conf/.
>>> I'm working on
>>> 
>>> building Flink and flink-kubernetes-operator images for the Wikimedia
>>> Foundation, and I found this strange as well.  It makes sense in a docker /
>>> docker-compose only environment, but in k8s where you have ConfigMap
>>> responsible for flink-conf.yaml, and (also logs all going to the console,
>>> not FLINK_HOME/log), I'd prefer if the image was not modified by the
>>> ENTRYPOINT.
>>>
>>> I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
>>> 
>>> provided by flink-docker is not really needed.  It seems to be written more
>>> for deployments outside of kubernetes.
>>>  flink-kubernetes-operator never calls the built in subcommands (e.g.
>>> standalone-job), and always runs in 'pass-through' mode, just execing the
>>> args passed to it.  At WMF we build
>>>  our own images, so I'm planning
>>> on removing all of the stuff in ENTRYPOINTs that mangles the image.
>>> Anything that I might want to keep from docker-entrypoint.sh (like enabling
>>> jemoalloc
>>> )
>>> I should be able to do in the Dockerfile at image creation time.
>>>
>>> >  want to set an API key as part of the flink-conf.yaml file, but we
>>> don't want it to be persisted in Kubernetes or in our version control
>>> I personally am still pretty green at k8s, but would using kubernetes
>>> Secrets
>>> 
>>> work for your use case? I know we use them at WMF, but from a quick glance
>>> I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
>>> that renders flink-conf.yaml, but I feel like there should be a way.
>>>
>>>
>>>
>>>
>>> On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:
>>>
 Hi Lucas!

 The Flink kubernetes integration itself is responsible for mounting the
 configmap and overwriting the entrypoint not the operator. Therefore this
 is not something we can easily change from the operator side. However I
 think we are looking at the problem from the wrong side and there may be a
 solution already :)

 Ideally what you want is ENV replacement in Flink configuration. This
 is not something that the Flink community has added yet unfortunately but
 we have it on our radar for the operator at least (
 https://issues.apache.org/jira/browse/FLINK-27491). It will probably
 be added in the next 1.4.0 version.

 This will be possible from Flink 1.16 which introduced a small feature
 that allows us to inject parameters to the kubernetes entrypoints:
 https://issues.apache.org/jira/browse/FLINK-29123

 https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d

 While it's not implemented in the operator yet, you could try setting
 the following config in Flink 1.16.0:
 kubernetes.jobmanager.entrypoint.args: -D
 datadog.secret.conf=$MY_SECRET_ENV
 kubernetes.taskmanager.entrypoint.args: -D
 datadog.secret.conf=$MY_SECRET_ENV

 If you use this configuration together with the default native mode in
 the operator, it should work I believe.

 Please try and let me know!
 Gyula

 On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
 lucas.capare...@gympass.com> wrote:

> Hello folks,
>
> Not sure if this is the best list for this, sorry if it isn't. I'd
> appreciate some pointers :-)
>
> When using flink-kubernetes-operator [1], docker-entrypoint.sh [2]
> goes through several failures to write into $FLINK_HOME/conf/. We believe
> this is due to this volume being mounted from a 

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Andrew Otto
> Andrew please see my previous response, that covers the secrets case.
> kubernetes.jobmanager.entrypoint.args: -D
datadog.secret.conf=$MY_SECRET_ENV

This way^?  Ya that makes sense.  It'd be nice if there was a way to get
Secrets into the values used for rendering flink-conf.yaml too, so the
confs will be all in the same place.





On Thu, Dec 1, 2022 at 9:30 AM Gyula Fóra  wrote:

> Andrew please see my previous response, that covers the secrets case.
>
> Gyula
>
> On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:
>
>> > several failures to write into $FLINK_HOME/conf/.
>> I'm working on
>> 
>> building Flink and flink-kubernetes-operator images for the Wikimedia
>> Foundation, and I found this strange as well.  It makes sense in a docker /
>> docker-compose only environment, but in k8s where you have ConfigMap
>> responsible for flink-conf.yaml, and (also logs all going to the console,
>> not FLINK_HOME/log), I'd prefer if the image was not modified by the
>> ENTRYPOINT.
>>
>> I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
>> 
>> provided by flink-docker is not really needed.  It seems to be written more
>> for deployments outside of kubernetes.
>>  flink-kubernetes-operator never calls the built in subcommands (e.g.
>> standalone-job), and always runs in 'pass-through' mode, just execing the
>> args passed to it.  At WMF we build
>>  our own images, so I'm planning
>> on removing all of the stuff in ENTRYPOINTs that mangles the image.
>> Anything that I might want to keep from docker-entrypoint.sh (like enabling
>> jemoalloc
>> )
>> I should be able to do in the Dockerfile at image creation time.
>>
>> >  want to set an API key as part of the flink-conf.yaml file, but we
>> don't want it to be persisted in Kubernetes or in our version control
>> I personally am still pretty green at k8s, but would using kubernetes
>> Secrets
>> 
>> work for your use case? I know we use them at WMF, but from a quick glance
>> I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
>> that renders flink-conf.yaml, but I feel like there should be a way.
>>
>>
>>
>>
>> On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:
>>
>>> Hi Lucas!
>>>
>>> The Flink kubernetes integration itself is responsible for mounting the
>>> configmap and overwriting the entrypoint not the operator. Therefore this
>>> is not something we can easily change from the operator side. However I
>>> think we are looking at the problem from the wrong side and there may be a
>>> solution already :)
>>>
>>> Ideally what you want is ENV replacement in Flink configuration. This is
>>> not something that the Flink community has added yet unfortunately but we
>>> have it on our radar for the operator at least (
>>> https://issues.apache.org/jira/browse/FLINK-27491). It will probably be
>>> added in the next 1.4.0 version.
>>>
>>> This will be possible from Flink 1.16 which introduced a small feature
>>> that allows us to inject parameters to the kubernetes entrypoints:
>>> https://issues.apache.org/jira/browse/FLINK-29123
>>>
>>> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>>>
>>> While it's not implemented in the operator yet, you could try setting
>>> the following config in Flink 1.16.0:
>>> kubernetes.jobmanager.entrypoint.args: -D
>>> datadog.secret.conf=$MY_SECRET_ENV
>>> kubernetes.taskmanager.entrypoint.args: -D
>>> datadog.secret.conf=$MY_SECRET_ENV
>>>
>>> If you use this configuration together with the default native mode in
>>> the operator, it should work I believe.
>>>
>>> Please try and let me know!
>>> Gyula
>>>
>>> On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
>>> lucas.capare...@gympass.com> wrote:
>>>
 Hello folks,

 Not sure if this is the best list for this, sorry if it isn't. I'd
 appreciate some pointers :-)

 When using flink-kubernetes-operator [1], docker-entrypoint.sh [2] goes
 through several failures to write into $FLINK_HOME/conf/. We believe this
 is due to this volume being mounted from a ConfigMap, which means it's
 read-only.

 This has been reported in the past in GCP's operator, but I was unable
 to find any kind of resolution for it:
 https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/213

 In our use case, we want to set an API key as part of the
 flink-conf.yaml file, but we don't want it to be persisted in Kubernetes or
 in our version control, since it's sensitive data. This API Key is used by

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Gyula Fóra
Andrew please see my previous response, that covers the secrets case.

Gyula

On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:

> > several failures to write into $FLINK_HOME/conf/.
> I'm working on
> 
> building Flink and flink-kubernetes-operator images for the Wikimedia
> Foundation, and I found this strange as well.  It makes sense in a docker /
> docker-compose only environment, but in k8s where you have ConfigMap
> responsible for flink-conf.yaml, and (also logs all going to the console,
> not FLINK_HOME/log), I'd prefer if the image was not modified by the
> ENTRYPOINT.
>
> I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
> 
> provided by flink-docker is not really needed.  It seems to be written more
> for deployments outside of kubernetes.
>  flink-kubernetes-operator never calls the built in subcommands (e.g.
> standalone-job), and always runs in 'pass-through' mode, just execing the
> args passed to it.  At WMF we build
>  our own images, so I'm planning
> on removing all of the stuff in ENTRYPOINTs that mangles the image.
> Anything that I might want to keep from docker-entrypoint.sh (like enabling
> jemoalloc
> )
> I should be able to do in the Dockerfile at image creation time.
>
> >  want to set an API key as part of the flink-conf.yaml file, but we
> don't want it to be persisted in Kubernetes or in our version control
> I personally am still pretty green at k8s, but would using kubernetes
> Secrets
> 
> work for your use case? I know we use them at WMF, but from a quick glance
> I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
> that renders flink-conf.yaml, but I feel like there should be a way.
>
>
>
>
> On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:
>
>> Hi Lucas!
>>
>> The Flink kubernetes integration itself is responsible for mounting the
>> configmap and overwriting the entrypoint not the operator. Therefore this
>> is not something we can easily change from the operator side. However I
>> think we are looking at the problem from the wrong side and there may be a
>> solution already :)
>>
>> Ideally what you want is ENV replacement in Flink configuration. This is
>> not something that the Flink community has added yet unfortunately but we
>> have it on our radar for the operator at least (
>> https://issues.apache.org/jira/browse/FLINK-27491). It will probably be
>> added in the next 1.4.0 version.
>>
>> This will be possible from Flink 1.16 which introduced a small feature
>> that allows us to inject parameters to the kubernetes entrypoints:
>> https://issues.apache.org/jira/browse/FLINK-29123
>>
>> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>>
>> While it's not implemented in the operator yet, you could try setting the
>> following config in Flink 1.16.0:
>> kubernetes.jobmanager.entrypoint.args: -D
>> datadog.secret.conf=$MY_SECRET_ENV
>> kubernetes.taskmanager.entrypoint.args: -D
>> datadog.secret.conf=$MY_SECRET_ENV
>>
>> If you use this configuration together with the default native mode in
>> the operator, it should work I believe.
>>
>> Please try and let me know!
>> Gyula
>>
>> On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
>> lucas.capare...@gympass.com> wrote:
>>
>>> Hello folks,
>>>
>>> Not sure if this is the best list for this, sorry if it isn't. I'd
>>> appreciate some pointers :-)
>>>
>>> When using flink-kubernetes-operator [1], docker-entrypoint.sh [2] goes
>>> through several failures to write into $FLINK_HOME/conf/. We believe this
>>> is due to this volume being mounted from a ConfigMap, which means it's
>>> read-only.
>>>
>>> This has been reported in the past in GCP's operator, but I was unable
>>> to find any kind of resolution for it:
>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/213
>>>
>>> In our use case, we want to set an API key as part of the
>>> flink-conf.yaml file, but we don't want it to be persisted in Kubernetes or
>>> in our version control, since it's sensitive data. This API Key is used by
>>> Flink to report metrics to Datadog [3].
>>>
>>> We have automation in place which allows us to accomplish this by
>>> setting environment variables pointing to a path in our secret manager,
>>> which only gets injected during runtime. That part is working fine.
>>>
>>> However, we're trying to inject this secret using the FLINK_PROPERTIES
>>> variable, which is appended [4] to the flink-conf.yaml file in the
>>> docker-entrypoint script, which fails due to the filesystem where the 

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Andrew Otto
> several failures to write into $FLINK_HOME/conf/.
I'm working on

building Flink and flink-kubernetes-operator images for the Wikimedia
Foundation, and I found this strange as well.  It makes sense in a docker /
docker-compose only environment, but in k8s where you have ConfigMap
responsible for flink-conf.yaml, and (also logs all going to the console,
not FLINK_HOME/log), I'd prefer if the image was not modified by the
ENTRYPOINT.

I believe that for flink-kubernetes-operator, the docker-entrypoint.sh

provided by flink-docker is not really needed.  It seems to be written more
for deployments outside of kubernetes.
 flink-kubernetes-operator never calls the built in subcommands (e.g.
standalone-job), and always runs in 'pass-through' mode, just execing the
args passed to it.  At WMF we build 
our own images, so I'm planning on removing all of the stuff in ENTRYPOINTs
that mangles the image.  Anything that I might want to keep from
docker-entrypoint.sh (like enabling jemoalloc
)
I should be able to do in the Dockerfile at image creation time.

>  want to set an API key as part of the flink-conf.yaml file, but we don't
want it to be persisted in Kubernetes or in our version control
I personally am still pretty green at k8s, but would using kubernetes
Secrets

work for your use case? I know we use them at WMF, but from a quick glance
I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
that renders flink-conf.yaml, but I feel like there should be a way.




On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:

> Hi Lucas!
>
> The Flink kubernetes integration itself is responsible for mounting the
> configmap and overwriting the entrypoint not the operator. Therefore this
> is not something we can easily change from the operator side. However I
> think we are looking at the problem from the wrong side and there may be a
> solution already :)
>
> Ideally what you want is ENV replacement in Flink configuration. This is
> not something that the Flink community has added yet unfortunately but we
> have it on our radar for the operator at least (
> https://issues.apache.org/jira/browse/FLINK-27491). It will probably be
> added in the next 1.4.0 version.
>
> This will be possible from Flink 1.16 which introduced a small feature
> that allows us to inject parameters to the kubernetes entrypoints:
> https://issues.apache.org/jira/browse/FLINK-29123
>
> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>
> While it's not implemented in the operator yet, you could try setting the
> following config in Flink 1.16.0:
> kubernetes.jobmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
> kubernetes.taskmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
>
> If you use this configuration together with the default native mode in the
> operator, it should work I believe.
>
> Please try and let me know!
> Gyula
>
> On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
> lucas.capare...@gympass.com> wrote:
>
>> Hello folks,
>>
>> Not sure if this is the best list for this, sorry if it isn't. I'd
>> appreciate some pointers :-)
>>
>> When using flink-kubernetes-operator [1], docker-entrypoint.sh [2] goes
>> through several failures to write into $FLINK_HOME/conf/. We believe this
>> is due to this volume being mounted from a ConfigMap, which means it's
>> read-only.
>>
>> This has been reported in the past in GCP's operator, but I was unable to
>> find any kind of resolution for it:
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/213
>>
>> In our use case, we want to set an API key as part of the flink-conf.yaml
>> file, but we don't want it to be persisted in Kubernetes or in our version
>> control, since it's sensitive data. This API Key is used by Flink to report
>> metrics to Datadog [3].
>>
>> We have automation in place which allows us to accomplish this by setting
>> environment variables pointing to a path in our secret manager, which only
>> gets injected during runtime. That part is working fine.
>>
>> However, we're trying to inject this secret using the FLINK_PROPERTIES
>> variable, which is appended [4] to the flink-conf.yaml file in the
>> docker-entrypoint script, which fails due to the filesystem where the file
>> is being read-only.
>>
>> We attempted working around this in 2 different ways:
>>
>>   - providing our own .spec.containers[0].command, where we copied over
>> /opt/flink to /tmp/flink and set FLINK_HOME=/tmp/flink. This did not work
>> because the 

(Co)ProcessFunction vs Keyed(Co)ProcessFunction

2022-12-01 Thread Salva Alcántara
The current docs claim [1]:

"KeyedProcessFunction, as an extension of ProcessFunction, gives access to
the key of timers in its onTimer(...) method."

So, from what it's worth, it seems that if one does not need to query the
current key, wich within a `Keyed(Co)ProcessFunction` can be done like this

```
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out)
throws Exception {
K key = ctx.getCurrentKey();
// ...
}
```

, they are mostly interchangeable (???). For instance, if I wanted to
replace an existing `RichCoFlatMapFunction` that uses keyed
state with a process function, should I use `CoProcessFunction`  or `KeyedCoProcessFunction`? The type parameters
for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`.
The current docs also claim that `CoProcessFunction` can work with keyed
state [1]:

"The ProcessFunction can be thought of as a FlatMapFunction with access to
keyed state and timers."

Also:

"If you want to access keyed state and timers you have to apply the
ProcessFunction on a keyed stream: `stream.keyBy(...).process(new
MyProcessFunction());`."

so it seems that the natural replacement choice should be the
`CoProcessFunction` unless I'm missing something that strictly requires the
usage of the keyed version (???). However, I've recently commented in [2]
and I got a reply saying that I should go with `KeyedCoProcessFunction` if
I'm using keyed state or timers which is a bit confusing to me. In summary:

- Does replacing a `CoFlatMapFunction` with a `CoProcessFunction` preserve
the original behaviour in presence of keyed state?
- If timers are used later on, does it make any difference if I use
`CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I
don't need to explicitly access the current key for anything
(`ctx.getCurrentKey()`).

Thanks in advance!

Salva

---

References

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction
[2]
https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885


Re: Difference between DataStream.broadcast() vs DataStream.broadcast(MapStateDescriptor)

2022-12-01 Thread Gen Luo
Datastream.broadcast only determines the distribution behavior. All
elements from the stream will broadcast to all the downstream tasks. Its
downstream can be a single input processing operator, or a co-processing
operator if it's connected to another stream.

DataStream.broadcast(MapStateDescriptor) can only be used to connect to
another stream, and it declares one or more StateDescriptors, which allow
the BroadcastProcessFunction following able to keep some states. That is
the BroadcastState mentioned in the SO answer. The BroadcastState is kind
of an operator state, while it assumes that the state of all instances are
exactly the same, so it can be duplicated to the new instances when the job
is restarted with the processor scaled up. The behavior differs from the
normal operator state.

In one word, if you need to use the BroadcastState, use
DataStream.broadcast(MapStateDescriptor); if you only want to broadcast the
elements, DataStream.broadcast is enough.

Qing Lim  于 2022年10月22日周六 00:21写道:

> Hi all, I am trying to figure out how Datastream.broadcast() and
> DataStream.broadcast(MapStateDescriptor) differ.
>
>
>
> My use case:
>
> I have 2 streams:
>
> Stream 1 contains updates, which collectively build up a state
>
> Stream 2 is keyed and every parallel instance need to connect with EVERY
> update from Stream 1.
>
>
>
> I am thinking I can probably achieve this by doing
>
>
>
> Stream1.broadcast().connect(stream2).process(myFun)
>
>
>
> I am failing to understand when would I need to use Broadcast State
> pattern, is it a convience method built on top of broadcast() or is it
> something very different?
>
>
>
> The best info I’ve found is from this SO:
> https://stackoverflow.com/questions/50570605/why-broadcast-state-can-store-the-dynamic-rules-however-broadcast-operator-c
>
> Which seems to suggest Broadcast State broadcast() then maintain state in
> each parallel operator under the hood?
>
>
>
> Kind regards.
>
>
>
> *Qing Lim *| Marshall Wace LLP, George House, 131 Sloane Street, London | 
> E-mail:
> q@mwam.com | Tel: +44 207 925 4865
>
>
>
>
>
> This e-mail and any attachments are confidential to the addressee(s) and
> may contain information that is legally privileged and/or confidential. If
> you are not the intended recipient of this e-mail you are hereby notified
> that any dissemination, distribution, or copying of its content is strictly
> prohibited. If you have received this message in error, please notify the
> sender by return e-mail and destroy the message and all copies in your
> possession.
>
>
> To find out more details about how we may collect, use and share your
> personal information, please see https://www.mwam.com/privacy-policy.
> This includes details of how calls you make to us may be recorded in order
> for us to comply with our legal and regulatory obligations.
>
>
> To the extent that the contents of this email constitutes a financial
> promotion, please note that it is issued only to and/or directed only at
> persons who are professional clients or eligible counterparties as defined
> in the FCA Rules. Any investment products or services described in this
> email are available only to professional clients and eligible
> counterparties. Persons who are not professional clients or eligible
> counterparties should not rely or act on the contents of this email.
>
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct
> Authority. Marshall Wace LLP is a limited liability partnership registered
> in England and Wales with registered number OC302228 and registered office
> at George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving
> this e-mail as a client, or an investor in an investment vehicle, managed
> or advised by Marshall Wace North America L.P., the sender of this e-mail
> is communicating with you in the sender's capacity as an associated or
> related person of Marshall Wace North America L.P. ("MWNA"), which is
> registered with the US Securities and Exchange Commission ("SEC") as an
> investment adviser.  Registration with the SEC does not imply that MWNA or
> its employees possess a certain level of skill or training.
>


????

2022-12-01 Thread ????