????

2022-02-14 Thread ????

Fraud detection demo with Flink 1.14

2022-02-14 Thread Pramit Vamsi
Hi, Problem: Watermark does not move within Dynamic Alert Function Implementing ideas (as is) from this article - https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html Code: https://github.com/afedulov/fraud-detection-demo Pipeline: Kafka -> Dynamic Key Function -> Dynamic Alert

flinkcdc:slave with the same server_uuid/server_id as this slave has connected to the master;

2022-02-14 Thread maker_d...@foxmail.com
flink version:flink-1.13.5 cdc version:2.1.1 在使用flinkcdc同步多个表时遇到报错: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=1) at

Re: TM OOMKilled

2022-02-14 Thread Xintong Song
Hi Alexey, You may want to double check if `state.backend.rocksdb.memory.managed` is configured to `true`. (This should be `true` by default.) Another question that may or may not be related. I noticed that you have configured 128MB task off-heap memory, which IIRC the default should be 0. Could

flink sql jdbc sink事务提交问题

2022-02-14 Thread casel.chen
最近在扩展flink sql jdbc connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。

Re: TM OOMKilled

2022-02-14 Thread Alexey Trenikhun
Hello, We use RocksDB, but there is no problem with Java heap, which is limited by 3.523gb, the problem with total container memory. The pod is killed not due OutOfMemoryError, but because total container memory exceeds 10gb Thanks, Alexey From: Caizhi Weng

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Gopi Krishna M
CheckpointedFunction docs mention the following - > The snapshotState(FunctionSnapshotContext) >

Re: jdbc connector 写入异常数据后,不再写入新数据时,异常信息不断嵌套,且不会抛出

2022-02-14 Thread Caizhi Weng
Hi! 图片不能显示,建议传到 imgur 等外部图床上,并在邮件里贴出链接。 虽然看不到图片,但看描述应该是一个已知问题 [1],只是目前还没人修复。 [1] https://issues.apache.org/jira/browse/FLINK-24677 jianjianjianjianjianjianjianjian <724125...@qq.com.invalid> 于2022年2月14日周一 15:40写道: > 老师们,你们好: > 在使用jdbc connector写入操作时,*写入一条错误数据*(字段值超长)后*不再写入数据* >

Re: 请教设置了table.exec.state.ttl后,结果数据不更新问题

2022-02-14 Thread Caizhi Weng
Hi! 图片不能显示,建议传到 imgur 等外部图床上,再把链接贴到邮件里。 设置 state ttl 之前 sink 数据能一直更新吗?确认不是因为后来的数据不符合某些 where 条件导致的吗? liangjinghong 于2022年2月12日周六 14:39写道: > 你好,我是一个flink新手。为了进行状态管理,我在代码中设置了configuration.setString("table.exec.state.ttl","12h"); > > 然而,在flink作业运行12小时后,*我的update结果表再也没有更新过*。从web

Re: There Is a Delay While Over Aggregation Sending Results

2022-02-14 Thread Caizhi Weng
Hi! Did you define watermark on ts? If yes the result will be produced only after the watermark exceeds its row time, thus causing the delay. See [1] for detail. [1]

Re: TM OOMKilled

2022-02-14 Thread Caizhi Weng
Hi! Heap memory usage depends heavily on your job and your state backend. Which state backend are you using and if possible could you share your user code or explain what operations your job is doing? Alexey Trenikhun 于2022年2月15日周二 05:17写道: > Hello, > We run Flink 1.13.5 job in app mode in

[statefun] Looking for a polyglot example

2022-02-14 Thread casel.chen
Hello, I am looking for polyglot example of stateful functions and learn how to program functions with different language then deploy them together as a unit of event driven application. Examples like Fraud Detection which use python functions in ML while use java funtions to process data etc.

Re:Re: [statefun] Add new state failure in Greetings example

2022-02-14 Thread casel.chen
Oh, yes. I missed to add my newly defined ValueSpec as parameter to withValueSpec() method. After placed it works, thank you Igal At 2022-02-15 01:42:39, "Igal Shilman" wrote: Hello, Make sure that you have added the state when creating the function spec like in this example[1]

Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-14 Thread Fuyao Li
Hi Yun, Please ignore my question 2. I think the Sink part is the decisive factor to ensure end to end exactly once. If I want to implement a AT LEAST ONCE sink, which interface should I implement? Maybe

Re: Flink High-Availability and Job-Manager recovery

2022-02-14 Thread Koffman, Noa (Nokia - IL/Kfar Sava)
Hi, thanks for your reply, it was very helpful. we tried to go with the 2nd approach, enabling HA mode, and added these conf values: high-availability: zookeeper high-availability.zookeeper.quorum: zk-noa-edge-infra:2181 high-availability.zookeeper.path.root: /flink

TM OOMKilled

2022-02-14 Thread Alexey Trenikhun
Hello, We run Flink 1.13.5 job in app mode in Kubernetes, 1 JM and 1 TM, we also have Kubernetes cron job which takes savepoint every 2 hour (14 */2 * * *), once in while (~1 per 2 days) TM is OOMKilled, suspiciously it happens on even hours ~4 minutes after savepoint start (e.g. 12:18, 4:18)

Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-14 Thread saravana...@gmail.com
Hi Niklas, Thanks for your reply. Approach [1] works only if operators are chained (in order words, operators executed within the same task). Since mapPartition operator parallelism is different from previous operator parallelism, it doesn't fall under the same task(or not chained) .

Re: [statefun] Add new state failure in Greetings example

2022-02-14 Thread Igal Shilman
Hello, Make sure that you have added the state when creating the function spec like in this example[1] If that wasn't it, can you send your UserFn? [1]

Re: How to access Task.isBackPressured() from a SourceFunction?

2022-02-14 Thread Niklas Semmler
Hi Darren, No, you cannot access the Task from the operator. You can access some metrics via the RuntimeContext. getRuntimeContext().getMetricGroup() How does the backpressure help you here? Backpressure can originate in any operator or network connection. If it's an operator further

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Gopi Krishna M
On Mon, Feb 14, 2022 at 10:00 PM Niklas Semmler wrote: > So, you want to send basically the last message before the barrier? > Yes. > > Can you not instead send the first message after the barrier? From a first > glance this sounds easier. > I'm not sure if this will help me synchronize the

Unit test harness for Sources

2022-02-14 Thread James Sandys-Lumsdaine
Hi all, I've been using the test harness classes to unit test my stateful 1 and 2 stream functions. But I also have some stateful legacy Source classes I would like to unit test and can't find any documentation or example for that - is this possible? Thanks, James.

Re: Flink 1.12.x DataSet --> Flink 1.14.x DataStream

2022-02-14 Thread Niklas Semmler
Hi Saravanan, AFAIK the last record is not treated differently. Does the approach in [1] not work? Best regards, Niklas https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, I get that but I want to output that key so I can store it in Elastic grouped by the minute. I had explained with data examples above. But just to be sure Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get the bellow clicks event time here (ignored/not

"No operators defined in streaming topology" error when Flink app still starts successfully

2022-02-14 Thread Shane Bishop
Hi all, My team has started seeing the error "java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute." However, even with this error, the Flink application starts and runs fine, and the Flink job renders fine in the Flink Dashboard. Attached is the full

Re: table api watermarks, timestamps, outoforderness and head aches

2022-02-14 Thread Francesco Guardiani
Yep every operator usually cleans state of records past a received watermark On Mon, Feb 14, 2022 at 4:03 PM HG wrote: > Will keys that are out dated disappear? > > It is infact a kind of sessions window that can start at any time. > Constantly new keys will appear. > > > > > > > On Mon, Feb

Re: unpredictable behaviour on KafkaSource deserialisation error

2022-02-14 Thread Niklas Semmler
Hi Frank, This sounds like an interesting issue. Can you share a minimal working example? Best regards, Niklas > On 9. Feb 2022, at 23:11, Frank Dekervel wrote: > > Hello, > > When trying to reproduce a bug, we made a DeserialisationSchema that throws > an exception when a malformed message

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Niklas Semmler
So, you want to send basically the last message before the barrier? Can you not instead send the first message after the barrier? From a first glance this sounds easier. Can you share what you are trying to accomplish? Best regards, Niklas > On 14. Feb 2022, at 17:04, Gopi Krishna M wrote:

Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread Francesco Guardiani
Yep that should do it, perhaps are you willing to contribute to that docs page adding the import? :) On Mon, Feb 14, 2022 at 4:34 PM HG wrote: > The static was missing  > > import static org.apache.flink.table.api.Expressions.*; > > > > > Op ma 14 feb. 2022 om 15:45 schreef Francesco

Re: Need help on implementing custom`snapshotState` in KafkaSource & KafkaSourceReader

2022-02-14 Thread Niklas Semmler
Hi Santosh, It’s best to avoid cross-posting. Let’s keep the discussion to SO. Best regards, Niklas > On 12. Feb 2022, at 16:39, santosh joshi wrote: > > We are migrating to KafkaSource from FlinkKafkaConsumer. We have disabled > auto commit of offset and instead committing them manually to

Statefun with no Protobuf ingress and egress

2022-02-14 Thread mrAlexTFB
Hi, I have a very simple schema where one python statefun application reads from a kafka topic and writes in another kafka topic, those topics are produced and consumed with another python script as it is done in the Python Flink Walkthrough

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Gopi Krishna M
Thanks Niklas! This helps with synchronizing uploads across partitioned tasks. The next step is to pass the handle to this upload to the sink which should be part of the same checkpoint. Is it possible to do the following: 1. Keep reducing the events to keyedStore. 2. On snapshotState: upload the

Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread HG
The static was missing  import static org.apache.flink.table.api.Expressions.*; Op ma 14 feb. 2022 om 15:45 schreef Francesco Guardiani < france...@ververica.com>: > > symbol: method $(java.lang.String) > > location: class esl.job.cag.verwerkingstijden.CagVerwerkingsTijden > > What

Save app-global cache used by RichAsyncFunction to Flink State?

2022-02-14 Thread Clayton Wohl
Is there any way to save a custom application-global cache into Flink state so that it is used with checkpoints + savepoints? This cache is used by a RichAsyncFunction that queries an external database, and RichAsyncFunction doesn't support the Flink state functionality directly. I asked this

How to cogroup multiple streams?

2022-02-14 Thread Will Lauer
OK, here's what I hope is a stupid question: what's the most efficient way to co-group more than 2 DataStreams together? I'm looking at porting a pipeline from pig to flink, and in a couple of places I use Pig's COGROUP functionality to simultaneously group 3 or 4 and sometimes even more datasets

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John, That is what exactly the window operator does for you. Can you please check the documentation[1] and let us know what part of the window operator alone does not suffice for the use case? Sincerely, Ali [1]:

Re: table api watermarks, timestamps, outoforderness and head aches

2022-02-14 Thread HG
Will keys that are out dated disappear? It is infact a kind of sessions window that can start at any time. Constantly new keys will appear. On Mon, Feb 14, 2022, 15:57 Francesco Guardiani wrote: > Hi, > > - bounded out of orderness: This means that you have essentially a stream > where

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Because I want to group them for the last X minutes. In this case last 1 minute. On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek wrote: > Hello John, > > Then may I ask you why you need to use a time attribute as part of your > key? > Why not just key by the fields like `mydomain.com` and

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John, Then may I ask you why you need to use a time attribute as part of your key? Why not just key by the fields like `mydomain.com` and `some-article` in your example and use only window operator for grouping elements based on time? Sincerely, Ali On Mon, Feb 14, 2022 at 3:55 PM John

Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread HG
That's the main function. I have no idea what imports are missing. I am able to use the SQL API for the table. When I switch to .select (()) where(()) groupBy(()) I get this error. On Mon, Feb 14, 2022, 15:45 Francesco Guardiani wrote: > > symbol: method $(java.lang.String) > >

Re: table api watermarks, timestamps, outoforderness and head aches

2022-02-14 Thread Francesco Guardiani
Hi, - bounded out of orderness: This means that you have essentially a stream where events can come late of a certain amount of time, compared to the "newest" event received. For example, with a bounded out of orderness of 5 minutes, you essentially say to Flink that your stream can receive an

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, thanks. As previously mentioned, processing time. So I regardless when the event was generated I want to count all events I have right now (as soon as they are seen by the flink job). On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek wrote: > Hello John, > > Currently you are grouping the

Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread Francesco Guardiani
> symbol: method $(java.lang.String) > location: class esl.job.cag.verwerkingstijden.CagVerwerkingsTijden What is esl.job.cag.verwerkingstijden.CagVerwerkingsTijden? Sounds like a bad import? Also, have you checked you have Flink deps aligned? On Mon, Feb 14, 2022 at 3:17 PM HG wrote: > >

Re: Joining Flink tables with different watermark delay

2022-02-14 Thread Francesco Guardiani
Hi, So my understanding of your query is that you want to do a join first, and then group by a 60 minutes distance and aggregate them. Please correct me if I'm wrong. First of all, the query you've posted is incorrect and should fail, as its plan is invalid because it's using a regular join.

Re: Synchronization across tasks using checkpoint barriers

2022-02-14 Thread Niklas Semmler
Hi Gopi, You can implement CheckpointedFunction and use the method snapshotState(FunctionSnapshotContext) to upload state on each checkpoint. https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html Make sure, you

select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread HG
Hi, When I do : Table counts = t .groupBy($("transactionId")) .select($("transactionId"), $("handlingTime").sum().as("summedhandlingTime")); The code below fails with : cannot find symbol .select($("transactionId"),

flink-netty-shuffle文件占满节点磁盘

2022-02-14 Thread 智能平台
各位老师好: 执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了 文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa 说明: 1. 批处理模式 2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx \AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘

flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-14 Thread liangjinghong
各位老师们好,以下代码在开发环境中可以执行,打包部署后报错: 代码: CREATE VIEW used_num_common (toolName,region,type,flavor,used_num) AS select info.toolName as toolName,r.regionName as region,f.type,f.flavor,count(1) as used_num from tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job join tbl_schedule_task/*+

Re: How to proper hashCode() for keys.

2022-02-14 Thread Ali Bahadir Zeybek
Hello John, Currently you are grouping the elements two times based on some time attribute, one while keying - with event time - and one while windowing - with processing time. Therefore, the windowing mechanism produces a new window computation when you see an element with the same key but