(无主题)
在flink处理函数中定义一个状态变量,比如private ValueState vs;这个状态变量是否需要用transient来修饰,为什么呢?以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰?请大家指教
Re: Async IO operator to write to DB
Thanks Martijn, the documentation for Async IO was also indicating the same and that's what prompted me to post this question here. ~ Karthik On Mon, Jun 12, 2023 at 7:45 PM Martijn Visser wrote: > Hi Karthik, > > In my opinion, it makes more sense to use a sink to leverage Scylla over > using Async IO. The primary use case for Async IO is enrichment, not for > writing to a sync. > > Best regards, > > Martijn > > On Mon, Jun 12, 2023 at 4:10 PM Karthik Deivasigamani > wrote: > >> Thanks Martijn for your response. >> One thing I did not mention was that we are in the process of moving away >> from Cassandra to Scylla and would like to use the Scylla Java Driver for >> the following reason : >> >>> The Scylla Java driver is shard aware and contains extensions for a >>> tokenAwareHostPolicy. Using this policy, the driver can select a >>> connection to a particular shard based on the shard’s token. As a result, >>> latency is significantly reduced because there is no need to pass data >>> between the shards. >>> >> We were considering writing our own Sink to leverage Scylla Java Driver >> once the migration is done. >> ~ >> Karthik >> >> >> On Mon, Jun 12, 2023 at 4:56 PM Martijn Visser >> wrote: >> >>> Hi, >>> >>> Why wouldn't you just use the Flink Kafka connector and the Flink >>> Cassandra connector for your use case? >>> >>> Best regards, >>> >>> Martijn >>> >>> On Mon, Jun 12, 2023 at 12:03 PM Karthik Deivasigamani < >>> karthi...@gmail.com> wrote: >>> Hi, I have a use case where I need to read messages from a Kafka topic, parse it and write it to a database (Cassandra). Since Cassandra supports async APIs I was considering using Async IO operator for my writes. I do not need exactly-once semantics for my use-case. Is it okay to leverage the Async IO operator as a Sink (writing data into a DB)? ~ Karthik >>>
Flink bulk and record file source format metrices
Hello, Using Flink record stream format file source API as below for parquet records reading. FileSource.FileSourceBuilder source = FileSource.forRecordStreamFormat(streamformat, path); source.monitorContinuously(Duration.ofMillis(1)); Want to log/generate metrices for corrupt records and for the same need to log flink metrices at source level in parquet reader class, is there any way to do that as right now no handle for SourceContext available? Rgds, Kamal
Re: flink sql作业指标名称过长把prometheus内存打爆问题
hi casel 1. 可以考虑使用 Flink1.15, 使用精简的 operator name https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/#table-exec-simplify-operator-name-enabled 2. Flink 也提供了 restful 接口直接获取瞬时的 metric,如果不需要历史的 metric https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobmanager-metrics Best, Feng On Tue, Jun 13, 2023 at 8:51 AM casel.chen wrote: > 线上跑了200多个flink > sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。 > flink > sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称, > 请问这个问题有什么好的办法解决吗?
flink sql作业指标名称过长把prometheus内存打爆问题
线上跑了200多个flink sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。 flink sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称, 请问这个问题有什么好的办法解决吗?
Re:Re: 求flink作业各个算子的延迟指标
谢谢解答,如果是flink sql作业要如何获取到作业中每个算子的latency指标呢? 而且通过prometheus获取作业指标的话,因为flink sql作业中每个算子的名称是按sql内容拼出来的,会出现名称很长, 这样的算子指标直接打到prometheus的话会直接将prometheus内存打爆,这个问题有什么好的办法解决吗? 在 2023-06-12 18:01:11,"Hangxiang Yu" 写道: >[.[.]]..latency >这个应该可以满足需求?也可以设置不同的粒度。 >https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#io > >On Mon, Jun 12, 2023 at 5:05 PM casel.chen wrote: > >> 想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗? > > > >-- >Best, >Hangxiang.
Re: Building Dynamic SQL using contents of Map datastructure
Hi Yogesh, I think you need to build the dynamic SQL statement in your service and then submit the SQL to flink cluster. Best, Shammon FY On Mon, Jun 12, 2023 at 9:15 PM Yogesh Rao wrote: > Hi, > > Is there a way we can build a dynamic SQL in Flink from contents of Map ? > > Essentially trying to do achieve something like below > > StringBuilder builder = new StringBuilder("INSERT INTO sampleSink SELECT " > ); > > builder.append("getColumnsFromMap(dataMap), "); > > builder.append(" FROM Data").toString(); > > > Here getColumnsFromMap is registered as ScalarFunction which would return > array of Strings basically column names. > > > Regards, > > -Yogesh >
Running Into Error When Installing Apache Flink Using Python
Hi, I tried to install apache-flink python package but getting an error message, and I am unsure how to resolve this, since I am able to install other packages like pandas without any issues. Any help would be greatly appreciated. vagrant@devbox:~$ pip install apache-flink Defaulting to user installation because normal site-packages is not writeable Collecting apache-flink Using cached apache_flink-1.17.1-cp310-cp310-manylinux1_x86_64.whl (5.7 MB) Collecting requests>=2.26.0 Using cached requests-2.31.0-py3-none-any.whl (62 kB) Collecting apache-beam==2.43.0 Using cached apache_beam-2.43.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.0 MB) Collecting pemja==0.3.0 Using cached pemja-0.3.0-cp310-cp310-manylinux1_x86_64.whl (332 kB) Collecting apache-flink-libraries<1.17.2,>=1.17.1 Downloading apache-flink-libraries-1.17.1.tar.gz (240.2 MB) ━━━╸ 240.2/240.2 MB 19.0 MB/s eta 0:00:01 ERROR: Exception: Traceback (most recent call last): File "/usr/lib/python3/dist-packages/pip/_vendor/urllib3/response.py", line 438, in _error_catcher yield File "/usr/lib/python3/dist-packages/pip/_vendor/urllib3/response.py", line 519, in read data = self._fp.read(amt) if not fp_closed else b"" File "/usr/lib/python3/dist-packages/pip/_vendor/cachecontrol/filewrapper.py", line 90, in read data = self.__fp.read(amt) File "/usr/lib/python3.10/http/client.py", line 465, in read s = self.fp.read(amt) File "/usr/lib/python3.10/socket.py", line 705, in readinto return self._sock.recv_into(b) File "/usr/lib/python3.10/ssl.py", line 1274, in recv_into return self.read(nbytes, buffer) File "/usr/lib/python3.10/ssl.py", line 1130, in read return self._sslobj.read(len, buffer) TimeoutError: The read operation timed out During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/lib/python3/dist-packages/pip/_internal/cli/base_command.py", line 165, in exc_logging_wrapper status = run_func(*args) File "/usr/lib/python3/dist-packages/pip/_internal/cli/req_command.py", line 205, in wrapper return func(self, options, args) File "/usr/lib/python3/dist-packages/pip/_internal/commands/install.py", line 339, in run requirement_set = resolver.resolve( File "/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/resolver.py", line 94, in resolve result = self._result = resolver.resolve( File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", line 481, in resolve state = resolution.resolve(requirements, max_rounds=max_rounds) File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", line 373, in resolve failure_causes = self._attempt_to_pin_criterion(name) File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", line 213, in _attempt_to_pin_criterion criteria = self._get_updated_criteria(candidate) File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", line 204, in _get_updated_criteria self._add_to_criteria(criteria, requirement, parent=candidate) File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", line 172, in _add_to_criteria if not criterion.candidates: File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/structs.py", line 151, in __bool__ return bool(self._sequence) File "/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/found_candidates.py", line 155, in __bool__ return any(self) File "/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/found_candidates.py", line 143, in return (c for c in iterator if id(c) not in self._incompatible_ids) File "/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/found_candidates.py", line 47, in _iter_built candidate = func() File "/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/factory.py", line 215, in _make_candidate_from_link self._link_candidate_cache[link] = LinkCandidate( File "/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/candidates.py", line 288, in __init__ super().__init__( File "/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/candidates.py", line 158, in __init__ self.dist = self._prepare() File "/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/candidates.py", line 227, in _prepare dist = self._prepare_distribution() File "/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/candidates.py", line 299, in _prepare_distribution return preparer.prepare_linked_requirement(self._ireq, parallel_builds=True) File "/usr/lib/python3/dist-packages/pip/_internal/operations/prepare.py", line 487, in prepare_linked_requirement return self._prepare_linked_requirement(req, parallel_builds) File
Re: Async IO operator to write to DB
Hi Karthik, In my opinion, it makes more sense to use a sink to leverage Scylla over using Async IO. The primary use case for Async IO is enrichment, not for writing to a sync. Best regards, Martijn On Mon, Jun 12, 2023 at 4:10 PM Karthik Deivasigamani wrote: > Thanks Martijn for your response. > One thing I did not mention was that we are in the process of moving away > from Cassandra to Scylla and would like to use the Scylla Java Driver for > the following reason : > >> The Scylla Java driver is shard aware and contains extensions for a >> tokenAwareHostPolicy. Using this policy, the driver can select a >> connection to a particular shard based on the shard’s token. As a result, >> latency is significantly reduced because there is no need to pass data >> between the shards. >> > We were considering writing our own Sink to leverage Scylla Java Driver > once the migration is done. > ~ > Karthik > > > On Mon, Jun 12, 2023 at 4:56 PM Martijn Visser > wrote: > >> Hi, >> >> Why wouldn't you just use the Flink Kafka connector and the Flink >> Cassandra connector for your use case? >> >> Best regards, >> >> Martijn >> >> On Mon, Jun 12, 2023 at 12:03 PM Karthik Deivasigamani < >> karthi...@gmail.com> wrote: >> >>> Hi, >>>I have a use case where I need to read messages from a Kafka topic, >>> parse it and write it to a database (Cassandra). Since Cassandra supports >>> async APIs I was considering using Async IO operator for my writes. I do >>> not need exactly-once semantics for my use-case. >>> Is it okay to leverage the Async IO operator as a Sink (writing data >>> into a DB)? >>> ~ >>> Karthik >>> >>
Re: Async IO operator to write to DB
Thanks Martijn for your response. One thing I did not mention was that we are in the process of moving away from Cassandra to Scylla and would like to use the Scylla Java Driver for the following reason : > The Scylla Java driver is shard aware and contains extensions for a > tokenAwareHostPolicy. Using this policy, the driver can select a > connection to a particular shard based on the shard’s token. As a result, > latency is significantly reduced because there is no need to pass data > between the shards. > We were considering writing our own Sink to leverage Scylla Java Driver once the migration is done. ~ Karthik On Mon, Jun 12, 2023 at 4:56 PM Martijn Visser wrote: > Hi, > > Why wouldn't you just use the Flink Kafka connector and the Flink > Cassandra connector for your use case? > > Best regards, > > Martijn > > On Mon, Jun 12, 2023 at 12:03 PM Karthik Deivasigamani < > karthi...@gmail.com> wrote: > >> Hi, >>I have a use case where I need to read messages from a Kafka topic, >> parse it and write it to a database (Cassandra). Since Cassandra supports >> async APIs I was considering using Async IO operator for my writes. I do >> not need exactly-once semantics for my use-case. >> Is it okay to leverage the Async IO operator as a Sink (writing data into >> a DB)? >> ~ >> Karthik >> >
Building Dynamic SQL using contents of Map datastructure
Hi, Is there a way we can build a dynamic SQL in Flink from contents of Map ? Essentially trying to do achieve something like below StringBuilder builder = new StringBuilder("INSERT INTO sampleSink SELECT "); builder.append("getColumnsFromMap(dataMap), "); builder.append(" FROM Data").toString(); Here getColumnsFromMap is registered as ScalarFunction which would return array of Strings basically column names. Regards, -Yogesh
Re: Flink source error handling
1. Like a regular Kafka client, so it depends on how you configure it. 2. Yes 3. It depends on the failure of course, but you could create something like a Dead Letter Queue in case deserialization fails for your incoming message. Best regards, Martijn On Sat, Jun 10, 2023 at 2:03 PM Anirban Dutta Gupta < anir...@indicussoftware.com> wrote: > Hello, > > Thanks for the guidance. We will surely think of moving to a newer version > of Flink. > Just a few followup questions when using KafkaSource..(sorry if I am being > naive in my questions) > > 1. How does KafkaSource handle disruptions with the Kafka broker ? Will it > keep on trying to connect and subscribe to the broker indefinitely or will > it fail the Flink source after a certain number of retries ? > > 2. Will there be some log output in the Flink logs while the KafkaSource > is trying again to connect to the broker after a disruption ? > > 3. In case of the source failing, is there a way in the Flink program > using the KafkaSource to detect the error and add some error handling > mechanism..for e.g. sending an alert mail to the stakeholders in case the > source fails completely. (Something similar to > "ActionRequestFailureHandler" for ElasticsearchSink) > > Many thanks in advance, > Anirban > > On 09-06-2023 20:01, Martijn Visser wrote: > > Hi, > > This consumer should not be used. This only occurs in really old and no > longer supported Flink versions. You should really upgrade to a newer > version of Flink and use the KafkaSource. > > Best regards, > > Martijn > > On Fri, Jun 9, 2023 at 11:05 AM Anirban Dutta Gupta < > anir...@indicussoftware.com> wrote: > >> Hello, >> >> We are using "FlinkKafkaConsumer011" as a Kafka source consumer for >> Flink. Please guide on how to implement error handling mechanism for the >> following: >> 1. If the subscription to the Kafka topic gets lost, Kafka connection >> gets disconnected. >> In this case, is there any mechanism of re-subscribing to the Kafka >> topic automatically in the program. >> >> 2. If there is any error in the FetchRecords of the consumer. >> >> Thanks and Regards, >> Anirban >> > >
Re: Fail to run flink 1.17 job with flink-operator 1.5.0 version
Hi! I think you forgot to upgrade the operator CRD (which contains the updates enum values). Please see: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/#1-upgrading-the-crd Cheers Gyula On Mon, 12 Jun 2023 at 13:38, Liting Liu (litiliu) wrote: > Hi, I was trying to submit a flink 1.17 job with the > flink-kubernetes-operator version v1.5.0. > But encountered the below exception: > > > The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion: > Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15", > "v1_16" > > > I think the flink-operator should have supported flink 1.17, because as > https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/ > requires, > autoscaler only work well with flink 1.17. > > > >
Re: Fail to run flink 1.17 job with flink-operator 1.5.0 version
Hi! I think you forgot to upgrade the operator CRD (which contains the updates enum values). Please see: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/#1-upgrading-the-crd Cheers Gyula On Mon, 12 Jun 2023 at 13:38, Liting Liu (litiliu) wrote: > Hi, I was trying to submit a flink 1.17 job with the > flink-kubernetes-operator version v1.5.0. > But encountered the below exception: > > > The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion: > Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15", > "v1_16" > > > I think the flink-operator should have supported flink 1.17, because as > https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/ > requires, > autoscaler only work well with flink 1.17. > > > >
Fail to run flink 1.17 job with flink-operator 1.5.0 version
Hi, I was trying to submit a flink 1.17 job with the flink-kubernetes-operator version v1.5.0. But encountered the below exception: The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion: Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15", "v1_16" I think the flink-operator should have supported flink 1.17, because as https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/ requires, autoscaler only work well with flink 1.17.
Fail to run flink 1.17 job with flink-operator 1.5.0 version
Hi, I was trying to submit a flink 1.17 job with the flink-kubernetes-operator version v1.5.0. But encountered the below exception: The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion: Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15", "v1_16" I think the flink-operator should have supported flink 1.17, because as https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/ requires, autoscaler only work well with flink 1.17.
Re: Async IO operator to write to DB
Hi, Why wouldn't you just use the Flink Kafka connector and the Flink Cassandra connector for your use case? Best regards, Martijn On Mon, Jun 12, 2023 at 12:03 PM Karthik Deivasigamani wrote: > Hi, >I have a use case where I need to read messages from a Kafka topic, > parse it and write it to a database (Cassandra). Since Cassandra supports > async APIs I was considering using Async IO operator for my writes. I do > not need exactly-once semantics for my use-case. > Is it okay to leverage the Async IO operator as a Sink (writing data into > a DB)? > ~ > Karthik >
Async IO operator to write to DB
Hi, I have a use case where I need to read messages from a Kafka topic, parse it and write it to a database (Cassandra). Since Cassandra supports async APIs I was considering using Async IO operator for my writes. I do not need exactly-once semantics for my use-case. Is it okay to leverage the Async IO operator as a Sink (writing data into a DB)? ~ Karthik
Re: 求flink作业各个算子的延迟指标
[.[.]]..latency 这个应该可以满足需求?也可以设置不同的粒度。 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#io On Mon, Jun 12, 2023 at 5:05 PM casel.chen wrote: > 想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗? -- Best, Hangxiang.
求flink作业各个算子的延迟指标
想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗?
Re: Unsubscribe
Please send an email to user-unsubscr...@flink.apache.org to unsubscribe Best, Hang Yu voidy 于2023年6月12日周一 11:39写道: > >
Re:回复:flink作业延迟时效指标
我的是flink sql作业,要如何实现你说的方案呢?我看到阿里云实时计算平台VVR是支持展示作业时延指标的,想知道它是如何实现的 在 2023-06-08 16:46:34,"17610775726" <17610775...@163.com> 写道: >Hi > > >你提到的所有的监控都是可以通过 metric 来监控报警的,至于你提到的 LatencyMarker 因为它不参与算子内部的计算逻辑的时间,所以这个 >metric 并不是准确的,但是如果任务有反压的情况下 LatencyMarker >也会被阻塞,所以大体上还是可以反应出任务的延迟情况,如果想要准确的计算出端到端的延迟,可以在 消费 kafka 的时候获取一个 start time 时间戳 >在 sink 的时候获取一个 end time 时间戳,然后自定义一个 metric 把这个结果上报 基于这个 metric 做端到端的延迟监控。 > > >Best >JasonLee > > > 回复的原邮件 >| 发件人 | casel.chen | >| 发送日期 | 2023年06月8日 16:39 | >| 收件人 | user-zh@flink.apache.org | >| 主题 | flink作业延迟时效指标 | >我想知道当前flink作业延迟了多久现在能通过什么指标可以获取到吗?想通过设置作业延迟告警来反馈作业健康状况,是否产生背压,是否需要增加资源等。 >以mysql表实时同步到doris表为例:mysql binlog -> kafka -> flink -> doris >延迟指标包括: >1. 业务延迟:业务延迟=当前系统时间 - 当前系统处理的最后一条数据的事件时间(Event time) >例如:kafka消息写入doris的时间 - kafka消息数据本身产生时间(例如更新mysql记录的时间) >2. 数据滞留延迟:数据滞留时间=数据进入实时计算的时间 - 数据事件时间(Event time) >例如:flink消费到kafka消息时间 - 消费到的kafka消息数据本身产生时间(例如更新mysql记录的时间) > > >当前我们是用kafka消费组积压告警来替代的,但这个数据不准,一是因为flink >checkpoint才会更新offset,二是因为生产流量在不同时段是不同的,在流量低的时候告警不及时。 >查了官网有一个LatencyMarker可以开启使用,请问这个开启后要怎么观察延迟呢?这个metric需要上报到prometheus才可以读到吗? > > >我们遇到另一个问题是使用flink >sql提交作业生成的metric名称很长,因为operatorId是根据sql内容来生成的,所以动不动就把prometheus给打爆了,这个有什么办法解决么?