Re: Flink状态过期时是否可以将其输出到日志中

2022-07-07 Thread yidan zhao
可以考虑通过sideout方式自己打印,或者补充计算。

haishui  于2022年6月27日周一 14:10写道:
>
> Hi,
> Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。
>
> Best Regards!


Re: Guide for building Flink image with Python doesn't work

2022-07-07 Thread Xingbo Huang
Hi Gyula,

According to the log, we can see that you downloaded the source package of
pemja, not the wheel package of pemja[1]. I guess you are using the m1
machine. If you install pemja from the source package, you need to have
JDK, gcc tools and CPython with Numpy in the environment. I believe this
can be solved after you prepared those tools, but other dependencies of
pyflink 1.15 do not support m1, which makes PyFlink 1.15 unable to install
and use in m1.

We have supported m1 in release 1.16[2]. If a large number of m1 users have
big demand for PyFlink 1.15, I think we need to consider whether it is
necessary to backport this support to 1.15, which will break our
compatibility issues between minor versions.
Best,
Xingbo

[1] https://pypi.org/project/pemja/0.1.4/
[2] https://issues.apache.org/jira/browse/FLINK-25188

Gyula Fóra  于2022年7月6日周三 13:56写道:

> Here it is, copied from the docs essentially:
>
> FROM flink:1.15.0
>
>
> # install python3: it has updated Python to 3.9 in Debian 11 and so
> install Python 3.7 from source
> # it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
>
> RUN apt-get update -y && \
> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev git && \
> wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
> tar -xvf Python-3.7.9.tgz && \
> cd Python-3.7.9 && \
> ./configure --without-tests --enable-shared && \
> make -j6 && \
> make install && \
> ldconfig /usr/local/lib && \
> cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
> ln -s /usr/local/bin/python3 /usr/local/bin/python && \
> apt-get clean && \
> rm -rf /var/lib/apt/lists/*
>
> # install PyFlink
> RUN pip3 install apache-flink==1.15.0
>
> And I am running:
> docker build --tag pyflink:latest .
>
> This gives the following error:
>
>
> *#6 64.12  cwd: /tmp/pip-install-9_farey_/pemja/#6 64.12
> Complete output (1 lines):#6 64.12 Include folder should be at
> '/usr/local/openjdk-11/include' but doesn't exist. Please check you've
> installed the JDK properly.*
>
> A side note:
> The Dockerfile in the docs is missing git so initially I got the following
> error:
>
> *#7 57.73 raise OSError("%r was not found" % name)#7 57.73
> OSError: 'git' was not found *
>
> @Weihua Hu  can you please send your working
> Dockerfile?
>
> Gyula
>
> On Wed, Jul 6, 2022 at 4:16 AM Weihua Hu  wrote:
>
>> Hi Gyula,
>>
>> I can build pyFlink image successfully by following this guide. Did you
>> add a dependency outside of the documentation? And could you provide your
>> Dockerfile
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>>
>> Best,
>> Weihua
>>
>>
>> On Tue, Jul 5, 2022 at 11:40 PM Gyula Fóra  wrote:
>>
>>> Well in any case either the official image is incorrect (maybe we should
>>> include JDK by default not JRE) or we should update the
>>> documentation regarding the python docker build because it simply doesn't
>>> work at the moment.
>>>
>>> I am still looking for a full working example that adds the required
>>> Python packages on top of a Flink 1.15.0 base image :)
>>>
>>> Gyula
>>>
>>> On Tue, Jul 5, 2022 at 5:36 PM Weihua Hu  wrote:
>>>
 In addition, you can try providing the Dockerfile

 Best,
 Weihua


 On Tue, Jul 5, 2022 at 11:24 PM Weihua Hu 
 wrote:

> Hi,
>
> The base image flink:1.15.0 is built from openjdk:11-jre, and this
> image only installs jre but not jdk.
> It looks like the package you want to install (pemja) depends on jdk.
> you need install openjdk-11-jdk in dockerfile,
> take a look to how it is installed in the official image:
>
>
> https://hub.docker.com/layers/openjdk/library/openjdk/11-jdk/images/sha256-bc0af19c7c4f492fe6ff0c1d1c8c0e5dd90ab801385b220347bb91dbe2b4094f?context=explore
>
>
> Best,
> Weihua
>
>
> On Tue, Jul 5, 2022 at 3:50 PM Gyula Fóra 
> wrote:
>
>> Hi All!
>>
>> I have been trying to experiment with the Flink python support on
>> Kubernetes but I got stuck creating a custom image with all the necessary
>> python libraries.
>>
>> I found this guide in the docs:
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>>
>> However when I try to build a custom image using it, I get the
>> following error:
>>
>> #7 131.7 Collecting pemja==0.1.4
>> #7 131.8   Downloading pemja-0.1.4.tar.gz (32 kB)
>> #7 131.9 ERROR: Command errored out with exit status 255:
>> #7 131.9  command: /usr/local/bin/python3.7 -c 'import sys,
>> setuptools, tokenize; sys.argv[0] =
>> '"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';
>> __file__='"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';f=getattr(tokenize,
>> '"'"'open'"'"', 

Configure a kafka source dynamically (???)

2022-07-07 Thread Salva Alcántara
When using the kafka connector, you need to set the topics in advance (by
giving a list of them or a regex pattern for the topic names). Imagine a
situation where the topics are not known in advance, of course you could
use an all-pass regex pattern to match all the topics in the broker but
what I want to know is whether it's possible to read from new topics on
demand.

E.g., initially the source starts without any topics to read from so
nothing is read until it gets a control msg (which could be pushed to a
control topic, for example) specifying the set of topics to subscribe to. I
guess this could be somehow implemented using custom subscribers once this
issue is merged/closed:

https://issues.apache.org/jira/browse/FLINK-24660

but would it be possible to achieve this objective without having to
periodically pull the broker, e.g., in a more reactive (push) way? I guess
if the kafka source (or any other source for what it's worth) were to have
a control signal like that then it would be more of an operator than a
source, really...

Salva

PS: Does anyone know the current state of FLINK-24660? The changes seem to
have been ready to merge for a while.


Re: [ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread Xingbo Huang
Thanks a lot for being our release manager David and everyone who
contributed.

Best,
Xingbo

David Anderson  于2022年7月8日周五 06:18写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/news/2022/07/06/release-1.15.1.html
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> David Anderson
>


Re: [ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread Xingbo Huang
Thanks a lot for being our release manager David and everyone who
contributed.

Best,
Xingbo

David Anderson  于2022年7月8日周五 06:18写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/news/2022/07/06/release-1.15.1.html
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> David Anderson
>


Re: PyFlink: restoring from savepoint

2022-07-07 Thread Dian Fu
Hi John,

Could you provide more information, e.g. the exact command submitting the
job, the logs file, the PyFlink version, etc?

Regards,
Dian


On Thu, Jul 7, 2022 at 7:53 PM John Tipper  wrote:

> Hi all,
>
> I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are
> being successfully saved to S3. However, I am unable to get the job to
> start from a save point.
>
> The container is started with these args:
>
> “standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”,
> “-n”
>
> In the JM logs I can see “Starting
> StandaloneApplicationClusterEntrypoint…” where my arguments are listed.
>
> However, I don’t see any restore occurring in the logs and my application
> restarts with no state. How do I start a PyFlink job like this from a given
> savepoint?
>
> Many thanks,
>
> John
>
> Sent from my iPhone


[ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread David Anderson
The Apache Flink community is very happy to announce the release of Apache
Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:

https://flink.apache.org/news/2022/07/06/release-1.15.1.html

The full release notes are available in Jira:


https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
David Anderson


[ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread David Anderson
The Apache Flink community is very happy to announce the release of Apache
Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:

https://flink.apache.org/news/2022/07/06/release-1.15.1.html

The full release notes are available in Jira:


https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
David Anderson


Re: Restoring a job from a savepoint

2022-07-07 Thread John Tipper
Thank you all, that’s very helpful. It looks like there’s something else that’s 
causing my cluster to not load my savepoints, so I’ve submitted a separate 
query for that.

Many thanks,

John

Sent from my iPhone

On 6 Jul 2022, at 21:24, Alexander Fedulov  wrote:


Hi John,

use  $ bin/flink run -s s3://my_bucket/path/to/savepoints/  (no 
trailing slash, including schema).

where  should contain a valid _metadata file.

You should see logs like this:
INFO o.a.f.r.c.CheckpointCoordinator [] - Starting job foobar from savepoint 
s3://my_bucket/path/to/savepoints/ ()
INFO o.a.f.r.c.CheckpointCoordinator []  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 
foobar from Savepoint 1 @ 0 for foobar located at 
s3://my_bucket/path/to/savepoints/.

The indication of the correct restore should be the absence of exceptions. You 
might see messages like this one for operators that did not have any state in 
the savepoint:
INFO o.a.f.r.c.CheckpointCoordinator [] - Skipping empty savepoint state for 
operator a0f11f7a2c416beb6b7aed14be0d63ca.

Best,
Alexander Fedulov


On Wed, Jul 6, 2022 at 9:50 PM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi all,


The docs on restoring a job from a savepoint 
(https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints)
 state that the syntax is:


$ bin/flink run -s :savepointPath [:runArgs]

and where "you may give a path to either the savepoint’s directory or the 
_metadata file."


If I am using S3 as my store of state:

state.savepoints.dir: s3://my_bucket/path/to/savepoints

and an example savepoint is at:


s3://my_bucket/path/to/savepoints//_metadata


then what am I supposed to supply to the flink run​ command?  Is it:


  1.  The full path including filesystem: 
s3://my_bucket/path/to/savepoints//_metadata or 
s3://my_bucket/path/to/savepoints/
  2.  or the full path: my_bucket/path/to/savepoints//_metadata 
or my_bucket/path/to/savepoints/
  3.  or the path relative to the savepoints directory: /_metadata or 

If I supply a directory, do I need to specify a trailing slash?

Also, is there anything that I will see in the logs that will indicate that the 
restore from a savepoint has been successful?

Many thanks,

John



Re: Re: Flink interval join 水印疑问

2022-07-07 Thread yidan zhao
你代码没格式化,不方便看。
首先我们讨论的都是eventtime场景,processtime场景下watermark没用。

假设Record代表一次请求信息,那么可以把该请求的发生时间作为该Record的eventime,也就是这个Record的timestamp。
现在要统计每5分钟的pv信息,就需要依据这个Record的timestamp决定划分到哪个窗口,比如划分到了 MMdd 8:30 ~
MMdd 8:35 这个窗口。
考虑到数据可以迟到,任何时刻,比如即使到了 8.40分,也可能再出现 8.33 分的数据。
但是,flink不可能无限等待下午,8.35的窗口必须在某个时机闭合并输出该窗口的统计结果到下游。
如果采用 EventTimeTrigger
的话,这个决定闭合窗口的时机就是:当watermark达到窗口的maxTimestamp。该窗口的maxTimestamp就是 8.35
分那个点。
watermark则是根据用户选择的策略生成,比如在source部分,根据当前task看到的最大的Record的timestamp,减去一个
maxOutOfOrderness 即为 watermark。这个 maxOutOfOrderness 就是允许数据乱序的程度。

至于watermark的传播,简单说就是向后广播即可。
双流 join 情况的话,需要取小,取2个watermark的更小的那个。
对于从同一个流进入的watermark是取大(这个其实逻辑正确的话,生成端就决定了递增了,接收端做个判定只是保险,避免出现watermark倒退而已)。

lxk  于2022年7月7日周四 19:57写道:
>
> 按照这个说法,那么timestamp和watermark其实没有关系。
> 但是我看到有关帖子里说:双流join里存储的mapstate。
> 而StreamRecord和watermark都是继承于streamelement,Flink会替换StreamRecord 
> 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。
> 具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。
> @OverridepublicvoidprocessElement(StreamRecord element)throws Exception { 
> finalTvalue= element.getValue(); // 调用 用户实现的 extractTimestamp 
> 获取新的TimestampfinallongnewTimestamp= userFunction.extractTimestamp(value, 
> element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 
> 用新Timestamp 替换StreamRecord中的旧Timestamp 
> output.collect(element.replace(element.getValue(), newTimestamp)); // 调用 
> 用户实现的 checkAndGetNextWatermark 方法获取下一个WatermarkfinalWatermarknextWatermark= 
> userFunction.checkAndGetNextWatermark(value, newTimestamp); // 如果下一个Watermark 
> 大于当前Watermark,就发出新的Watermarkif (nextWatermark != null && 
> nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = 
> nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } }
> 以上是我看见的帖子中的相关内容
> 如果上述说法不对的话,那么在双流join中,watermark是怎么流转的?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-07-07 10:03:00,"yidan zhao"  写道:
> >timestamp是为每个element(输入的记录)赋值的一个时间戳。
> >watermark是从source部分生成的水印个,然后向后传播。
> >
> >以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。
> >watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。
> >
> >考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。
> >
> >lxk  于2022年7月6日周三 13:36写道:
> >>
> >> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下
> >> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png
> >> 官方文档中说会从两个流中的 timestamp  中取最大值,看了下源码确实是这样
> >> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png
> >> 我的问题是:
> >> 1.这里的timestamp和watermark有什么区别?
> >> 2.interval 
> >> join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系
>
>


Re:Re: Flink interval join 水印疑问

2022-07-07 Thread lxk
按照这个说法,那么timestamp和watermark其实没有关系。
但是我看到有关帖子里说:双流join里存储的mapstate。
而StreamRecord和watermark都是继承于streamelement,Flink会替换StreamRecord 对象中的Timestamp,如果 
根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。
具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。
@OverridepublicvoidprocessElement(StreamRecord element)throws Exception { 
finalTvalue= element.getValue(); // 调用 用户实现的 extractTimestamp 
获取新的TimestampfinallongnewTimestamp= userFunction.extractTimestamp(value, 
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 
用新Timestamp 替换StreamRecord中的旧Timestamp 
output.collect(element.replace(element.getValue(), newTimestamp)); // 调用 用户实现的 
checkAndGetNextWatermark 方法获取下一个WatermarkfinalWatermarknextWatermark= 
userFunction.checkAndGetNextWatermark(value, newTimestamp); // 如果下一个Watermark 
大于当前Watermark,就发出新的Watermarkif (nextWatermark != null && 
nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = 
nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } }
以上是我看见的帖子中的相关内容
如果上述说法不对的话,那么在双流join中,watermark是怎么流转的?














在 2022-07-07 10:03:00,"yidan zhao"  写道:
>timestamp是为每个element(输入的记录)赋值的一个时间戳。
>watermark是从source部分生成的水印个,然后向后传播。
>
>以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。
>watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。
>
>考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。
>
>lxk  于2022年7月6日周三 13:36写道:
>>
>> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下
>> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png
>> 官方文档中说会从两个流中的 timestamp  中取最大值,看了下源码确实是这样
>> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png
>> 我的问题是:
>> 1.这里的timestamp和watermark有什么区别?
>> 2.interval 
>> join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系




PyFlink: restoring from savepoint

2022-07-07 Thread John Tipper
Hi all,

I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are 
being successfully saved to S3. However, I am unable to get the job to start 
from a save point.

The container is started with these args:

“standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, “-n”

In the JM logs I can see “Starting StandaloneApplicationClusterEntrypoint…” 
where my arguments are listed.

However, I don’t see any restore occurring in the logs and my application 
restarts with no state. How do I start a PyFlink job like this from a given 
savepoint?

Many thanks,

John

Sent from my iPhone

Re: Difference between Session Mode and Session Job(Flink Opearator)

2022-07-07 Thread bat man
Awesome, thanks!

On Thu, Jul 7, 2022 at 1:21 PM Gyula Fóra  wrote:

> Hi!
>
> The Flink Kubernetes Operator on a high level supports 3 types of
> resources:
>
>1. Session Deployment : Empty Flink Session cluster
>2. Application Deployment: Flink Application cluster (single job /
>cluster)
>3. Session Job: Flink Job deployed to an existing Session Deployment.
>
> So in other words, the Session deployment only creates the Flink cluster.
> The Session job can be deployed to an existing session deployment and it
> represents an actual Flink job.
>
> I hope that helps :)
> Gyula
>
> On Thu, Jul 7, 2022 at 7:42 AM bat man  wrote:
>
>> Hi,
>>
>> I want to understand the difference between session mode and the new
>> deployment mode - Flink Session Job which I believe is newly introduced as
>> part of the Flink Operator(1.15) release.
>> What's the benefit of using this mode as opposed to session mode as both
>> run sessions to which flink jobs can be submitted.
>>
>> Cheers.
>> H.
>>
>


Setting a timer within broadcast applyToKeyedState() (feature request)

2022-07-07 Thread James Sandys-Lumsdaine
Hello,

I know we can’t set a timer in the processBroadcastElement() of the 
KeyedBroadcastProcessFunction as there is no key. 

However, there is a context.applyToKeyedState() method which allows us to 
iterate over the keyed state in the scope of a key. So it is possible to add 
access to the TimerService onto the Context parameter passed into that 
delegate? 

Since the code running in the applyToKeyedState() method is scoped to a key we 
should be able to set up timers for that key too. 

Thanks,

James. 

Re: Difference between Session Mode and Session Job(Flink Opearator)

2022-07-07 Thread Gyula Fóra
Hi!

The Flink Kubernetes Operator on a high level supports 3 types of resources:

   1. Session Deployment : Empty Flink Session cluster
   2. Application Deployment: Flink Application cluster (single job /
   cluster)
   3. Session Job: Flink Job deployed to an existing Session Deployment.

So in other words, the Session deployment only creates the Flink cluster.
The Session job can be deployed to an existing session deployment and it
represents an actual Flink job.

I hope that helps :)
Gyula

On Thu, Jul 7, 2022 at 7:42 AM bat man  wrote:

> Hi,
>
> I want to understand the difference between session mode and the new
> deployment mode - Flink Session Job which I believe is newly introduced as
> part of the Flink Operator(1.15) release.
> What's the benefit of using this mode as opposed to session mode as both
> run sessions to which flink jobs can be submitted.
>
> Cheers.
> H.
>