Re: Flink状态过期时是否可以将其输出到日志中
可以考虑通过sideout方式自己打印,或者补充计算。 haishui 于2022年6月27日周一 14:10写道: > > Hi, > Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。 > > Best Regards!
Re: Guide for building Flink image with Python doesn't work
Hi Gyula, According to the log, we can see that you downloaded the source package of pemja, not the wheel package of pemja[1]. I guess you are using the m1 machine. If you install pemja from the source package, you need to have JDK, gcc tools and CPython with Numpy in the environment. I believe this can be solved after you prepared those tools, but other dependencies of pyflink 1.15 do not support m1, which makes PyFlink 1.15 unable to install and use in m1. We have supported m1 in release 1.16[2]. If a large number of m1 users have big demand for PyFlink 1.15, I think we need to consider whether it is necessary to backport this support to 1.15, which will break our compatibility issues between minor versions. Best, Xingbo [1] https://pypi.org/project/pemja/0.1.4/ [2] https://issues.apache.org/jira/browse/FLINK-25188 Gyula Fóra 于2022年7月6日周三 13:56写道: > Here it is, copied from the docs essentially: > > FROM flink:1.15.0 > > > # install python3: it has updated Python to 3.9 in Debian 11 and so > install Python 3.7 from source > # it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially. > > RUN apt-get update -y && \ > apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev > libffi-dev git && \ > wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \ > tar -xvf Python-3.7.9.tgz && \ > cd Python-3.7.9 && \ > ./configure --without-tests --enable-shared && \ > make -j6 && \ > make install && \ > ldconfig /usr/local/lib && \ > cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \ > ln -s /usr/local/bin/python3 /usr/local/bin/python && \ > apt-get clean && \ > rm -rf /var/lib/apt/lists/* > > # install PyFlink > RUN pip3 install apache-flink==1.15.0 > > And I am running: > docker build --tag pyflink:latest . > > This gives the following error: > > > *#6 64.12 cwd: /tmp/pip-install-9_farey_/pemja/#6 64.12 > Complete output (1 lines):#6 64.12 Include folder should be at > '/usr/local/openjdk-11/include' but doesn't exist. Please check you've > installed the JDK properly.* > > A side note: > The Dockerfile in the docs is missing git so initially I got the following > error: > > *#7 57.73 raise OSError("%r was not found" % name)#7 57.73 > OSError: 'git' was not found * > > @Weihua Hu can you please send your working > Dockerfile? > > Gyula > > On Wed, Jul 6, 2022 at 4:16 AM Weihua Hu wrote: > >> Hi Gyula, >> >> I can build pyFlink image successfully by following this guide. Did you >> add a dependency outside of the documentation? And could you provide your >> Dockerfile >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker >> >> Best, >> Weihua >> >> >> On Tue, Jul 5, 2022 at 11:40 PM Gyula Fóra wrote: >> >>> Well in any case either the official image is incorrect (maybe we should >>> include JDK by default not JRE) or we should update the >>> documentation regarding the python docker build because it simply doesn't >>> work at the moment. >>> >>> I am still looking for a full working example that adds the required >>> Python packages on top of a Flink 1.15.0 base image :) >>> >>> Gyula >>> >>> On Tue, Jul 5, 2022 at 5:36 PM Weihua Hu wrote: >>> In addition, you can try providing the Dockerfile Best, Weihua On Tue, Jul 5, 2022 at 11:24 PM Weihua Hu wrote: > Hi, > > The base image flink:1.15.0 is built from openjdk:11-jre, and this > image only installs jre but not jdk. > It looks like the package you want to install (pemja) depends on jdk. > you need install openjdk-11-jdk in dockerfile, > take a look to how it is installed in the official image: > > > https://hub.docker.com/layers/openjdk/library/openjdk/11-jdk/images/sha256-bc0af19c7c4f492fe6ff0c1d1c8c0e5dd90ab801385b220347bb91dbe2b4094f?context=explore > > > Best, > Weihua > > > On Tue, Jul 5, 2022 at 3:50 PM Gyula Fóra > wrote: > >> Hi All! >> >> I have been trying to experiment with the Flink python support on >> Kubernetes but I got stuck creating a custom image with all the necessary >> python libraries. >> >> I found this guide in the docs: >> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker >> >> However when I try to build a custom image using it, I get the >> following error: >> >> #7 131.7 Collecting pemja==0.1.4 >> #7 131.8 Downloading pemja-0.1.4.tar.gz (32 kB) >> #7 131.9 ERROR: Command errored out with exit status 255: >> #7 131.9 command: /usr/local/bin/python3.7 -c 'import sys, >> setuptools, tokenize; sys.argv[0] = >> '"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"'; >> __file__='"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';f=getattr(tokenize, >> '"'"'open'"'"',
Configure a kafka source dynamically (???)
When using the kafka connector, you need to set the topics in advance (by giving a list of them or a regex pattern for the topic names). Imagine a situation where the topics are not known in advance, of course you could use an all-pass regex pattern to match all the topics in the broker but what I want to know is whether it's possible to read from new topics on demand. E.g., initially the source starts without any topics to read from so nothing is read until it gets a control msg (which could be pushed to a control topic, for example) specifying the set of topics to subscribe to. I guess this could be somehow implemented using custom subscribers once this issue is merged/closed: https://issues.apache.org/jira/browse/FLINK-24660 but would it be possible to achieve this objective without having to periodically pull the broker, e.g., in a more reactive (push) way? I guess if the kafka source (or any other source for what it's worth) were to have a control signal like that then it would be more of an operator than a source, really... Salva PS: Does anyone know the current state of FLINK-24660? The changes seem to have been ready to merge for a while.
Re: [ANNOUNCE] Apache Flink 1.15.1 released
Thanks a lot for being our release manager David and everyone who contributed. Best, Xingbo David Anderson 于2022年7月8日周五 06:18写道: > The Apache Flink community is very happy to announce the release of Apache > Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15 > series. > > Apache Flink® is an open-source stream processing framework for > distributed, high-performing, always-available, and accurate data streaming > applications. > > The release is available for download at: > https://flink.apache.org/downloads.html > > Please check out the release blog post for an overview of the improvements > for this bugfix release: > > https://flink.apache.org/news/2022/07/06/release-1.15.1.html > > The full release notes are available in Jira: > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546 > > We would like to thank all contributors of the Apache Flink community who > made this release possible! > > Regards, > David Anderson >
Re: [ANNOUNCE] Apache Flink 1.15.1 released
Thanks a lot for being our release manager David and everyone who contributed. Best, Xingbo David Anderson 于2022年7月8日周五 06:18写道: > The Apache Flink community is very happy to announce the release of Apache > Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15 > series. > > Apache Flink® is an open-source stream processing framework for > distributed, high-performing, always-available, and accurate data streaming > applications. > > The release is available for download at: > https://flink.apache.org/downloads.html > > Please check out the release blog post for an overview of the improvements > for this bugfix release: > > https://flink.apache.org/news/2022/07/06/release-1.15.1.html > > The full release notes are available in Jira: > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546 > > We would like to thank all contributors of the Apache Flink community who > made this release possible! > > Regards, > David Anderson >
Re: PyFlink: restoring from savepoint
Hi John, Could you provide more information, e.g. the exact command submitting the job, the logs file, the PyFlink version, etc? Regards, Dian On Thu, Jul 7, 2022 at 7:53 PM John Tipper wrote: > Hi all, > > I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are > being successfully saved to S3. However, I am unable to get the job to > start from a save point. > > The container is started with these args: > > “standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, > “-n” > > In the JM logs I can see “Starting > StandaloneApplicationClusterEntrypoint…” where my arguments are listed. > > However, I don’t see any restore occurring in the logs and my application > restarts with no state. How do I start a PyFlink job like this from a given > savepoint? > > Many thanks, > > John > > Sent from my iPhone
[ANNOUNCE] Apache Flink 1.15.1 released
The Apache Flink community is very happy to announce the release of Apache Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html Please check out the release blog post for an overview of the improvements for this bugfix release: https://flink.apache.org/news/2022/07/06/release-1.15.1.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546 We would like to thank all contributors of the Apache Flink community who made this release possible! Regards, David Anderson
[ANNOUNCE] Apache Flink 1.15.1 released
The Apache Flink community is very happy to announce the release of Apache Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html Please check out the release blog post for an overview of the improvements for this bugfix release: https://flink.apache.org/news/2022/07/06/release-1.15.1.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546 We would like to thank all contributors of the Apache Flink community who made this release possible! Regards, David Anderson
Re: Restoring a job from a savepoint
Thank you all, that’s very helpful. It looks like there’s something else that’s causing my cluster to not load my savepoints, so I’ve submitted a separate query for that. Many thanks, John Sent from my iPhone On 6 Jul 2022, at 21:24, Alexander Fedulov wrote: Hi John, use $ bin/flink run -s s3://my_bucket/path/to/savepoints/ (no trailing slash, including schema). where should contain a valid _metadata file. You should see logs like this: INFO o.a.f.r.c.CheckpointCoordinator [] - Starting job foobar from savepoint s3://my_bucket/path/to/savepoints/ () INFO o.a.f.r.c.CheckpointCoordinator [] org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job foobar from Savepoint 1 @ 0 for foobar located at s3://my_bucket/path/to/savepoints/. The indication of the correct restore should be the absence of exceptions. You might see messages like this one for operators that did not have any state in the savepoint: INFO o.a.f.r.c.CheckpointCoordinator [] - Skipping empty savepoint state for operator a0f11f7a2c416beb6b7aed14be0d63ca. Best, Alexander Fedulov On Wed, Jul 6, 2022 at 9:50 PM John Tipper mailto:john_tip...@hotmail.com>> wrote: Hi all, The docs on restoring a job from a savepoint (https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints) state that the syntax is: $ bin/flink run -s :savepointPath [:runArgs] and where "you may give a path to either the savepoint’s directory or the _metadata file." If I am using S3 as my store of state: state.savepoints.dir: s3://my_bucket/path/to/savepoints and an example savepoint is at: s3://my_bucket/path/to/savepoints//_metadata then what am I supposed to supply to the flink run command? Is it: 1. The full path including filesystem: s3://my_bucket/path/to/savepoints//_metadata or s3://my_bucket/path/to/savepoints/ 2. or the full path: my_bucket/path/to/savepoints//_metadata or my_bucket/path/to/savepoints/ 3. or the path relative to the savepoints directory: /_metadata or If I supply a directory, do I need to specify a trailing slash? Also, is there anything that I will see in the logs that will indicate that the restore from a savepoint has been successful? Many thanks, John
Re: Re: Flink interval join 水印疑问
你代码没格式化,不方便看。 首先我们讨论的都是eventtime场景,processtime场景下watermark没用。 假设Record代表一次请求信息,那么可以把该请求的发生时间作为该Record的eventime,也就是这个Record的timestamp。 现在要统计每5分钟的pv信息,就需要依据这个Record的timestamp决定划分到哪个窗口,比如划分到了 MMdd 8:30 ~ MMdd 8:35 这个窗口。 考虑到数据可以迟到,任何时刻,比如即使到了 8.40分,也可能再出现 8.33 分的数据。 但是,flink不可能无限等待下午,8.35的窗口必须在某个时机闭合并输出该窗口的统计结果到下游。 如果采用 EventTimeTrigger 的话,这个决定闭合窗口的时机就是:当watermark达到窗口的maxTimestamp。该窗口的maxTimestamp就是 8.35 分那个点。 watermark则是根据用户选择的策略生成,比如在source部分,根据当前task看到的最大的Record的timestamp,减去一个 maxOutOfOrderness 即为 watermark。这个 maxOutOfOrderness 就是允许数据乱序的程度。 至于watermark的传播,简单说就是向后广播即可。 双流 join 情况的话,需要取小,取2个watermark的更小的那个。 对于从同一个流进入的watermark是取大(这个其实逻辑正确的话,生成端就决定了递增了,接收端做个判定只是保险,避免出现watermark倒退而已)。 lxk 于2022年7月7日周四 19:57写道: > > 按照这个说法,那么timestamp和watermark其实没有关系。 > 但是我看到有关帖子里说:双流join里存储的mapstate。 > 而StreamRecord和watermark都是继承于streamelement,Flink会替换StreamRecord > 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。 > 具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。 > @OverridepublicvoidprocessElement(StreamRecord element)throws Exception { > finalTvalue= element.getValue(); // 调用 用户实现的 extractTimestamp > 获取新的TimestampfinallongnewTimestamp= userFunction.extractTimestamp(value, > element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // > 用新Timestamp 替换StreamRecord中的旧Timestamp > output.collect(element.replace(element.getValue(), newTimestamp)); // 调用 > 用户实现的 checkAndGetNextWatermark 方法获取下一个WatermarkfinalWatermarknextWatermark= > userFunction.checkAndGetNextWatermark(value, newTimestamp); // 如果下一个Watermark > 大于当前Watermark,就发出新的Watermarkif (nextWatermark != null && > nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = > nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } } > 以上是我看见的帖子中的相关内容 > 如果上述说法不对的话,那么在双流join中,watermark是怎么流转的? > > > > > > > > > > > > > > > 在 2022-07-07 10:03:00,"yidan zhao" 写道: > >timestamp是为每个element(输入的记录)赋值的一个时间戳。 > >watermark是从source部分生成的水印个,然后向后传播。 > > > >以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。 > >watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。 > > > >考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。 > > > >lxk 于2022年7月6日周三 13:36写道: > >> > >> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下 > >> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png > >> 官方文档中说会从两个流中的 timestamp 中取最大值,看了下源码确实是这样 > >> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png > >> 我的问题是: > >> 1.这里的timestamp和watermark有什么区别? > >> 2.interval > >> join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系 > >
Re:Re: Flink interval join 水印疑问
按照这个说法,那么timestamp和watermark其实没有关系。 但是我看到有关帖子里说:双流join里存储的mapstate。 而StreamRecord和watermark都是继承于streamelement,Flink会替换StreamRecord 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。 具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。 @OverridepublicvoidprocessElement(StreamRecord element)throws Exception { finalTvalue= element.getValue(); // 调用 用户实现的 extractTimestamp 获取新的TimestampfinallongnewTimestamp= userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 用新Timestamp 替换StreamRecord中的旧Timestamp output.collect(element.replace(element.getValue(), newTimestamp)); // 调用 用户实现的 checkAndGetNextWatermark 方法获取下一个WatermarkfinalWatermarknextWatermark= userFunction.checkAndGetNextWatermark(value, newTimestamp); // 如果下一个Watermark 大于当前Watermark,就发出新的Watermarkif (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } } 以上是我看见的帖子中的相关内容 如果上述说法不对的话,那么在双流join中,watermark是怎么流转的? 在 2022-07-07 10:03:00,"yidan zhao" 写道: >timestamp是为每个element(输入的记录)赋值的一个时间戳。 >watermark是从source部分生成的水印个,然后向后传播。 > >以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。 >watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。 > >考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。 > >lxk 于2022年7月6日周三 13:36写道: >> >> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下 >> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png >> 官方文档中说会从两个流中的 timestamp 中取最大值,看了下源码确实是这样 >> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png >> 我的问题是: >> 1.这里的timestamp和watermark有什么区别? >> 2.interval >> join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系
PyFlink: restoring from savepoint
Hi all, I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are being successfully saved to S3. However, I am unable to get the job to start from a save point. The container is started with these args: “standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, “-n” In the JM logs I can see “Starting StandaloneApplicationClusterEntrypoint…” where my arguments are listed. However, I don’t see any restore occurring in the logs and my application restarts with no state. How do I start a PyFlink job like this from a given savepoint? Many thanks, John Sent from my iPhone
Re: Difference between Session Mode and Session Job(Flink Opearator)
Awesome, thanks! On Thu, Jul 7, 2022 at 1:21 PM Gyula Fóra wrote: > Hi! > > The Flink Kubernetes Operator on a high level supports 3 types of > resources: > >1. Session Deployment : Empty Flink Session cluster >2. Application Deployment: Flink Application cluster (single job / >cluster) >3. Session Job: Flink Job deployed to an existing Session Deployment. > > So in other words, the Session deployment only creates the Flink cluster. > The Session job can be deployed to an existing session deployment and it > represents an actual Flink job. > > I hope that helps :) > Gyula > > On Thu, Jul 7, 2022 at 7:42 AM bat man wrote: > >> Hi, >> >> I want to understand the difference between session mode and the new >> deployment mode - Flink Session Job which I believe is newly introduced as >> part of the Flink Operator(1.15) release. >> What's the benefit of using this mode as opposed to session mode as both >> run sessions to which flink jobs can be submitted. >> >> Cheers. >> H. >> >
Setting a timer within broadcast applyToKeyedState() (feature request)
Hello, I know we can’t set a timer in the processBroadcastElement() of the KeyedBroadcastProcessFunction as there is no key. However, there is a context.applyToKeyedState() method which allows us to iterate over the keyed state in the scope of a key. So it is possible to add access to the TimerService onto the Context parameter passed into that delegate? Since the code running in the applyToKeyedState() method is scoped to a key we should be able to set up timers for that key too. Thanks, James.
Re: Difference between Session Mode and Session Job(Flink Opearator)
Hi! The Flink Kubernetes Operator on a high level supports 3 types of resources: 1. Session Deployment : Empty Flink Session cluster 2. Application Deployment: Flink Application cluster (single job / cluster) 3. Session Job: Flink Job deployed to an existing Session Deployment. So in other words, the Session deployment only creates the Flink cluster. The Session job can be deployed to an existing session deployment and it represents an actual Flink job. I hope that helps :) Gyula On Thu, Jul 7, 2022 at 7:42 AM bat man wrote: > Hi, > > I want to understand the difference between session mode and the new > deployment mode - Flink Session Job which I believe is newly introduced as > part of the Flink Operator(1.15) release. > What's the benefit of using this mode as opposed to session mode as both > run sessions to which flink jobs can be submitted. > > Cheers. > H. >