Re: Log4j2 configuration

2022-02-16 Thread Tamir Sagi
Hey I encountered the same issue while upgrading from v1.12.2 to 1.14.2 few weeks ago. starting from v1.13.0, deployment has been changed in K8s native mode.(detailed about its flow [1]) user input properties get overriden in flink-console.sh. which means, pointing to xml file(-Dlog4j2)

Re: Performance Issues in Source Operator while migrating to Flink-1.14 from 1.9

2022-02-16 Thread Arujit Pradhan
Hey Martijn, Thanks a lot for getting back to us. To give you a little bit more context, we do maintain an open-source project around flink dagger which is a wrapper for proto processing. As part of the upgrade to the latest version, we did some refactoring and

Implement watermark buffering with Process Function

2022-02-16 Thread Ruibin Xing
Hi, I'm trying to implement customized state logic with KeyedProcessFunction. But I'm not quite sure how to implement the correct watermark behavior when late data is involved. According to the answer on stackoverflow:

Re: Flink 1.12.8 release

2022-02-16 Thread Martijn Visser
Hi Joey, Since the Flink community only supports the latest and previous minor release [1] (currently Flink 1.14 and 1.13), I'm not expecting another release of Flink 1.12. Best regards, Martijn Visser https://twitter.com/MartijnVisser82 [1]

Re: Flink 1.12.8 release

2022-02-16 Thread Joey L
Hey Martin, Thanks for the response. That's unfortunate, I assumed there would be a 1.12.8 release since there are many Flink issues in JIRA marked with `Fix Versions: 1.12.8` and can see that there are many unreleased commits in the release-1.12 branch. Any chance that they would be released at

Python Function for Datastream Transformation in Flink Java Job

2022-02-16 Thread Jesry Pandawa
Hello, Currently, Flink already supports adding Python UDF and using that on Flink Java job. It can be used on Table API. Can we do the same for creating custom python function for Datastream transformation and use that on Flink Java job? Regards, Jesry

getting "original" ingestion timestamp after using a TimestampAssigner

2022-02-16 Thread Frank Dekervel
Hello, I'm getting messages from a kafka stream. The messages are JSON records with a "timestamp" key in the json. This timestamp key contains the time at which the message was generated. Now i'd like if these messages had a delivery delay (eg delay between message generation and arrival in

Re: Log4j2 configuration

2022-02-16 Thread Chesnay Schepler
hmmyes then it is indeed weird that it can't find the logger, but their error messages are notorious for being misleading in my experience. Can you set the log4j2.debug system property (to any value, even an empty string) and try again? If that doesn't reveal anything I would try the

Re: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-16 Thread Yun Gao
Hi Fuyao, Very sorry for the late reply. For the question 1, I think it would not cause data corruption: in Flink the checkpoint is achived via inserting barriers into the stream of normal records, and the snapshot is taken in the same thread with the record processing. Thus the snapshot of

AWS Kinesis Flink vs K8s

2022-02-16 Thread Puneet Duggal
Hi, Just wanted to ask the community various pros and cons of deploying flink using AWS Kinesis vs using K8s application mode. Currently we are deploying flink cluster in HA session standalone mode and planning to switch to application deployment mode. Regards, Puneet

Re: Statefun with no Protobuf ingress and egress

2022-02-16 Thread Igal Shilman
Hello, I've noticed that you've linked to a very old release of stateful function (2.0) where statefun 3.2 which is the latest, added support for exactly that. You are no longer required to use Protobuf, and you can simply send strings and even JSON. Checkout the following repository for some

Re: [statefun] Looking for a polyglot example

2022-02-16 Thread Igal Shilman
Hello, You can take a look at the flink stateful functions playground[1] where you can find a handful of examples, in all the supported SDKs, in addition for each language you will find a walkthrough that shows how to use the individual SDK features. Furthermore take a look at the documentations

Re: Exception Help

2022-02-16 Thread Francesco Guardiani
Hi, >From what I understand, you're creating a scalar function taking a string with json and then converting it to a map using a custom function. Assuming I understood correctly, I think the problem here is that you're using internal data types for UDFs, which is discouraged in most of the use

Re: AWS Kinesis Flink vs K8s

2022-02-16 Thread Danny Cranmer
+Jeremy who can help answer this question. Thanks, On Wed, Feb 16, 2022 at 10:26 AM Puneet Duggal wrote: > Hi, > > Just wanted to ask the community various pros and cons of deploying flink > using AWS Kinesis vs using K8s application mode. Currently we are deploying > flink cluster in HA

Re: Basic questions about resuming stateful Flink jobs

2022-02-16 Thread Piotr Nowojski
Hi James, Sure! The basic idea of checkpoints is that they are fully owned by the running job and used for failure recovery. Thus by default if you stopped the job, checkpoints are being removed. If you want to stop a job and then later resume working from the same point that it has previously

Re: How to proper hashCode() for keys.

2022-02-16 Thread Ali Bahadir Zeybek
Hello John, The requirement you have can be achieved by having a process window function in order to enrich the aggregate data with metadata information of the window. Please have a look at the training example[1] to see how to access the window information within a process window function.

Re: getting "original" ingestion timestamp after using a TimestampAssigner

2022-02-16 Thread Piotr Nowojski
Hi Frank, I'm not sure exactly what you are trying to accomplish, but yes. In the TimestampAssigner you can only return what should be the new timestamp for the given record. If you want to use "ingestion time" - "true even time" as some kind of delay metric, you will indeed need to have both

Re: Job manager slots are in bad state.

2022-02-16 Thread Piotr Nowojski
Hi Josson, Would you be able to reproduce this issue on a more recent version of Flink? I'm afraid that we won't be able to help with this issue as this affects a Flink version that is not supported for quite some time and moreover `SlotSharingManager` has been completed removed in Flink 1.13.

Re: Exception Help

2022-02-16 Thread Francesco Guardiani
Are you sure you're always matching the output row type provided by DynamicTableFactory ? Also looking at the javadocs it seems like you can

Fwd: How to get memory specific metrics for tasknodes

2022-02-16 Thread Diwakar Jha
Hello, Could someone please help! I'm trying to publish only these three metrics per tasknode Status.JVM.Memory.Heap.Used Status.JVM.Memory.Heap.Committed Status.JVM.Memory.NonHeap.Max But, with my current setting I see all Flink metrics getting published. Please let me know if I need to

SQL / Table Api lag() over partition by ... and windowing

2022-02-16 Thread HG
Hello all I need to calculate the difference in time between ordered rows per transactionId. All events should arrive within the timeframe set by the out-of-orderness ( a couple of minutes). Events outside this time should be ignored. In SQL this would be : select transactionId , handlingTime ,

Re: Performance Issues in Source Operator while migrating to Flink-1.14 from 1.9

2022-02-16 Thread Piotr Nowojski
Hi, Unfortunately the new KafkaSource was contributed without good benchmarks, and so far you are the first one that noticed and reported this issue. Without more direct comparison (as Martijn suggested), it's hard for us to help right away. It would be a tremendous help for us if you could for

Re: Problem with kafka with key=None using pyhton-kafka module

2022-02-16 Thread Igal Shilman
Hello, The default kafka ingress for remote functions, does require a key component. The key is being translated to the 'id' part of the receiving function address. If your functions are stateless, or the id doesn't have a meaning for you, you can simply provide a random id. I hope that helps,

Basic questions about resuming stateful Flink jobs

2022-02-16 Thread James Sandys-Lumsdaine
Hi all, I have a 1.14 Flink streaming workflow with many stateful functions that has a FsStateBackend and checkpointed enabled, although I haven't set a location for the checkpointed state. I've really struggled to understand how I can stop my Flink job and restart it and ensure it carries

Re: Failed to serialize the result for RPC call : requestMultipleJobDetails after Upgrading to Flink 1.14.3

2022-02-16 Thread Chirag Dewan
Thanks Chesnay. So it should be the /jobs API only? If that's the case I can disable my dashboards.  Sent from Yahoo Mail on Android On Wed, 16 Feb 2022 at 2:01 pm, Chesnay Schepler wrote: There are no side-effects; it just means that certain pages of the UI / REST API aren't working

RE: Basic questions about resuming stateful Flink jobs

2022-02-16 Thread Sandys-Lumsdaine, James
Thanks for your reply, Piotr. Some follow on questions: >". Nevertheless you might consider enabling them as this allows you to >manually cancel the job if it enters an endless recovery/failure loop, fix the >underlying issue, and restart the job from the externalised checkpoint. How is this

Re: Change column names Pyflink Table/Datastream API

2022-02-16 Thread Dian Fu
Hi Francis, There should be multiple ways to achieve this. Do you mean that all these methods don't work for you? If so, could you show the sample code? Besides, another way you may try is `inputmetrics.alias("timestamp, device, name, value")`. Regards, Dian On Wed, Feb 16, 2022 at 8:14 AM

Re: Exception Help

2022-02-16 Thread Jonathan Weaver
No, I'm creating a custom SQL lookup table (which uses AsyncTableFunction) which requires the internal types. I implement the LookupTableSource, AsyncTableFunction, DynamicTableSourceFactory trio as per the examples in the docs. My construction is the equivalent of this, and it still errors with

Re: Joining Flink tables with different watermark delay

2022-02-16 Thread Francesco Guardiani
> We plan to use this parquet source to create a Hybrid Source later. Hence, we had to use a File Source. FYI there is an open issue for this: https://issues.apache.org/jira/browse/FLINK-22793, but for the other points it makes sense to create the data stream directly, as it circumvents the

Re: Python Function for Datastream Transformation in Flink Java Job

2022-02-16 Thread Piotr Nowojski
Hi, As far as I can tell the answer is unfortunately no. With Table API (SQL) things are much simpler, as you have a restricted number of types of columns that you need to support and you don't need to support arbitrary Java classes as the records. I'm shooting blindly here, but maybe you can

Re: SQL / Table Api lag() over partition by ... and windowing

2022-02-16 Thread Francesco Guardiani
> Which does not work since it cannot find lag function :-( lag and over are not supported at the moment with Table, so you need to use SQL for that. > *Will this obey the watermark strategy of the original Datastream? (see further below)* Yes. The code at the end of the mail is correct and

Task manager errors with Flink ZooKeeper High Availability

2022-02-16 Thread Koffman, Noa (Nokia - IL/Kfar Sava)
Hi, We are currently running flink in session deployment on k8s cluster, with 1 job-manager and 3 task-managers To support recovery from job-manager failure, following a different mail thread, We have enabled zookeeper high availability using a k8s Persistent Volume To achieve this, we’ve added

Re: SQL / Table Api lag() over partition by ... and windowing

2022-02-16 Thread HG
Thanks Would the option for datastream be to write a MapPartitionFunction? Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani < france...@ververica.com>: > > Which does not work since it cannot find lag function :-( > > lag and over are not supported at the moment with Table, so you need

Re: Implement watermark buffering with Process Function

2022-02-16 Thread David Anderson
I'm afraid not. The DataStream window implementation uses internal APIs to manipulate the state backend namespace, which isn't possible to do with the public-facing API. And without this, you can't implement this as efficiently. David On Wed, Feb 16, 2022 at 12:04 PM Ruibin Xing wrote: > Hi, >

Re: Basic questions about resuming stateful Flink jobs

2022-02-16 Thread Cristian Constantinescu
Hi James, I literally just went through what you're doing at my job. While I'm using Apache Beam and not the Flink api directly, the concepts still apply. TL;DR: it works as expected. What I did is I set up a kafka topic listener that always throws an exception if the last received message's

Re:Re: [statefun] Looking for a polyglot example

2022-02-16 Thread casel.chen
Thank you Igal. The remote functions from different language can be "mixed" by deploying different unit services in Dockerfile, and they can exchange messages by common message types like json, protobuf etc, or even connect together by in/out kafka topics, right? At 2022-02-16

Apache Flink - User Defined Functions - Exception when passing all arguments

2022-02-16 Thread M Singh
Hi: I have a simple concatenate UDF (for testing purpose) defined as:     public static class ConcatenateFunction extends ScalarFunction {        public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object ... inputs) {            return Arrays.stream(inputs).map(i ->

Re: Change column names Pyflink Table/Datastream API

2022-02-16 Thread Francis Conroy
Hi Dian, Using .alias ended up working for me. Thanks for getting back to me. On Thu, 17 Feb 2022 at 01:15, Dian Fu wrote: > Hi Francis, > > There should be multiple ways to achieve this. Do you mean that all these > methods don't work for you? If so, could you show the sample code? Besides,

Re: Implement watermark buffering with Process Function

2022-02-16 Thread David Anderson
I've done some work on this with Nico Kruber. In our benchmarking, the performance loss (from not being able to use the namespace) was roughly a factor of two, so it is significant. We prototyped an API extension that addresses this particular concern but without exposing the namespace directly,

Re: Fwd: How to get memory specific metrics for tasknodes

2022-02-16 Thread Chesnay Schepler
It is currently not possible to select metrics. What you can do however is create a custom reporter that wraps the StatsD reporter which does this filtering. On 16/02/2022 17:41, Diwakar Jha wrote: Hello, Could someone please help! I'm trying  to publish only these three metrics per

Re: Json deserialisation with .jsonValue vs format=json in Table API

2022-02-16 Thread Илья Соин
Thank you, Francesco > On 3 Feb 2022, at 18:21, Francesco Guardiani wrote: > > Hi, > > I think the more stable option would be the first one, as it also gives you > more flexibility. Reading the row as string and then parsing it in a query > definitely costs more, and makes less

Re: Failed to serialize the result for RPC call : requestMultipleJobDetails after Upgrading to Flink 1.14.3

2022-02-16 Thread Chesnay Schepler
There are no side-effects; it just means that certain pages of the UI / REST API aren't working (i.e., the overview over all jobs). On 16/02/2022 06:15, Chirag Dewan wrote: Ah, should have looked better. I think https://issues.apache.org/jira/browse/FLINK-25732 causes this. Are there any

Re: cep 的困惑

2022-02-16 Thread yue ma
图片打不开,可以发下代码看看 翼 之道 于2022年2月16日周三 17:44写道: > 我写了一个demo程序进行简单的模式匹配,代码如下,但是每输入一个数据都是迟到的 > > > > 每输入一个数据 都通过迟到的流进行输出,没有进行模式匹配的计算 > > > > 请问这是为什么呢, 其他的复杂的模式匹配我都是验证成功的,这种最简单的为何得不到我想要的结果 >

org.apache.flink.runtime.rpc.exceptions.FencingTokenException

2022-02-16 Thread casel.chen
Hello, 我有一个Flink 1.13.2 on native kubernetes application作业遇到如下异常,会是什么原因造成的? Starting kubernetes-application as a console application on host dc-ads-ptfz-nspos-sib-trans-sum-6d9dbf587b-tgbmx. ERROR StatusLogger Reconfiguration failed: No configuration found for '135fbaa4' at 'null' in 'null'

cep 的困惑

2022-02-16 Thread 翼 之道
我写了一个demo程序进行简单的模式匹配,代码如下,但是每输入一个数据都是迟到的 [cid:image006.png@01D82356.C91B7890] [cid:image007.png@01D82356.C91B7890] 每输入一个数据 都通过迟到的流进行输出,没有进行模式匹配的计算 [cid:image008.png@01D82356.C91B7890] 请问这是为什么呢, 其他的复杂的模式匹配我都是验证成功的,这种最简单的为何得不到我想要的结果

Re: flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-16 Thread godfrey he
Hi liangjinghong, 原因是 blink planner 中引入并修改了 SqlTableRef 类, 而 Legacy planner 中没有引入 SqlTableRef 类,从而导致加载到了Calcite 中 SqlTableRef (该类有问题)。 解决方案:如果只使用到了blink planner,可以把legacy planner 的包冲lib下移除。 Best, Godfrey liangjinghong 于2022年2月14日周一 17:26写道: > 各位老师们好,以下代码在开发环境中可以执行,打包部署后报错: > > 代码: > > CREATE

Re:Re:flink sql jdbc sink事务提交问题

2022-02-16 Thread casel.chen
如果mysql配置不是auto commit,那么事务是在哪一步提交呢? 在 2022-02-16 10:24:39,"Michael Ran" 写道: >jdbc 连接 mysql 的driver 记得默认就是AutoCommit。phoenix不太清楚 >在 2022-02-15 13:25:07,"casel.chen" 写道: >>最近在扩展flink sql jdbc

关于将数据batch到一起处理再拆分后timestamp发生错乱的问题

2022-02-16 Thread yidan zhao
如图,这个是某个正常现象,分析如下: 数据A:ts1 数据B:ts2 假设ts1和ts2是2event time,并且属于不同窗口。 但是,当通过processFunc加个batch逻辑后,将2个元素放一起作为List这样输出到下一个算子处理,下个算子处理后再拆分输出。 此时,2个数据A和B的event time都会变成ts2。这导致了后续window操作的时间错乱。 不清楚大家有啥好的思路解决不。 目前2个考虑方案。 (1)拆分数据后重新生成event timestamp。 (2)processFunc换成keyedProcessFun,将窗口信息放入keyBy列表中。

转发: flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-16 Thread liangjinghong
感谢老师的回复,然而我的部署环境下的lib中没有您说的这个包,请问是要移除哪个包呢? 我的lib下有的包: flink-csv-1.13.0.jar flink-dist_2.11-1.13.0.jar flink-json-1.13.0.jar flink-shaded-zookeeper-3.4.14.jar flink-sql-connector-mysql-cdc-2.1.1.jar flink-table_2.11-1.13.0.jar flink-table-blink_2.11-1.13.0.jar log4j-1.2-api-2.12.1.jar

退订

2022-02-16 Thread qhp...@hotmail.com
qhp...@hotmail.com

flink 不触发checkpoint

2022-02-16 Thread 董少杰
flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint? flink版本1.12.2。 谢谢! | | 董少杰 | | eric21...@163.com |

????

2022-02-16 Thread ????

回复: flinkcdc:slave with the same server_uuid/server_id as this slave has connected to the master;

2022-02-16 Thread chengyanan1...@foxmail.com
hello,你好: flink cdc是基于debezium实现的mysql实时同步,debezium是以slave server的方式去读取mysql的binlog日志,默认情况下是,系统会自动生成一个介于 5400 和 6400 之间的随机数,作为debezium这个客户端的server-id,而这个id在mysql cluster中必须是唯一的,报这个错说明是有重复的server-id了,建议你显示的配上这个参数“server-id”,可以配置成一个数字或者一个范围: 另外当 scan.incremental.snapshot.enabled