Re: REST API in an HA setup - must the leading JM be called?

2021-08-18 Thread Juha Mynttinen
Thank you, answers my questions. -- Regards, Juha On Wed, Aug 18, 2021 at 2:28 PM Chesnay Schepler wrote: > You've pretty much answered the question yourself. *thumbs up* > > For the vast majority of cases you can call any JobManager. > The exceptions are jar operations (because they are

failures during job start

2021-08-18 Thread Colletta, Edward
Any help with this would be appreciated. Is it possible that this is a data/application issue or a flink config/resource issue? Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node. I have an issue where starting a job takes a long time, and sometimes fails with

Pre shuffle aggregation in flink is not working

2021-08-18 Thread suman shil
I am trying to do pre shuffle aggregation in flink. Following is the MapBundle implementation. *public class TaxiFareMapBundleFunction extends MapBundleFunction {@Overridepublic TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception { if (value ==

Timer Service vs Custom Triggers

2021-08-18 Thread Aeden Jameson
My use case is that I'm producing a set of measurements by key every 60 seconds. Currently, this is handled with the usual pattern of keyBy().window(Tumbling...(60)).process(...) I need to provide the same output, but as a result of a timeout. The data needed for the timeout summary will be in

Error while deserializing the element

2021-08-18 Thread vijayakumar palaniappan
Setup Specifics: Version: 1.6.2 RocksDB Map State Timers stored in rocksdb When we have this job running for long periods of time like > 30 days, if for some reason the job restarts, we encounter "Error while deserializing the element". Is this a known issue fixed in later versions? I see some

Re: Periodic output at end of stream

2021-08-18 Thread Matthias Broecheler
Hey JING, thanks for getting back to me. I tried to produce the smallest, self-contained example that produces the phenomenon: https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f If you run MainRepl you should see an infinite loop of re-processing the 5 integers. The offending

Setting S3 parameters in a K8 jobmanager deployment

2021-08-18 Thread Robert Cullen
I have a kubernetes jobmanager deployment that requires parameters be passed as command line rather than retrieving values from the flink-config map. Is there a way to do this? apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 # Set the value to greater

Re: Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Ingo Bürk
Hi Yuval, I can expand a bit more on the technical side of validation, though as a heads-up, I don't have a solution. When validating entire pipelines on a logical level, you run into the (maybe obvious) issue, that statements depend on previous statements. In the simple case of a CREATE TABLE

Process suspend when get Hana connection in open method of sink function

2021-08-18 Thread Chenzhiyuan(HR)
Dear all: I have a problem when I want to sink data to Hana database. Process is suspended when get Hana connection in the open method of sink function as below. My flink version is 1.10. public class HrrmPayValueSumToHana extends RichSinkFunction { @Override public void open(Configuration

Process suspend when get Hana connection in open method of sink function

2021-08-18 Thread Chenzhiyuan(HR)
Dear all: I have a problem when I want to sink data to Hana database. Process is suspended when get Hana connection in the open method of sink function as below. My flink version is 1.10. public class HrrmPayValueSumToHana extends RichSinkFunction { @Override public void open(Configuration

Re: flink Kinesis Consumer Connected but not consuming

2021-08-18 Thread Danny Cranmer
Hey Tarun, Your application looks ok and should work. I did notice this, however I cannot imagine it is an issue, unless you are not setting the region correctly: - getKafkaConsumerProperties() Make sure you are setting the correct region (AWSConfigConstants.AWS_REGION) in the

固定间隔重启策略 - 计数逻辑

2021-08-18 Thread much l
Hi 大家好: 我想问一下当重启策略为:restart-strategy: fixed-delay 时,其参数 restart-strategy.fixed-delay.attempts 是全局计数(任务生命周期)?还是每次当任务从HA失败策略中恢复后,会重置重试次数,下次失败重新从0开始?

Re: Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Yuval Itzchakov
Thanks Ingo! I just discovered this a short while before you posted :) Ideally, I'd like to validate that the entire pipeline is set up correctly. The problem is that I can't use methods like `tableEnv.sqlQuery` from multiple threads, and this is really limiting my ability to speed up the process

Re: flink 1.13.1版本,使用hive方言,执行insert overwirite语句,插入数据为空时,没有将表中原数据清空

2021-08-18 Thread Rui Li
你好, 这个可以去开个jira跟踪一下 On Tue, Aug 17, 2021 at 2:47 PM Asahi Lee <978466...@qq.com.invalid> wrote: > hi! > > 我使用如下sql,我select查询的数据为0行记录时,运行结束后,插入表的原数据没有被清空;而我在hive客户端执行时,表是被清空的! > INSERT OVERWRITE target_table SELECT * from source_table where id 10; -- Best regards! Rui Li

Re: Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Ingo Bürk
Hi Yuval, if syntactical correctness is all you care about, parsing the SQL should suffice. You can get a hold of the parser from TableEnvironmentImpl#getParser and then run #parse. This will require you to cast your table environment to the (internal) implementation, but maybe this works for

Re: REST API in an HA setup - must the leading JM be called?

2021-08-18 Thread Chesnay Schepler
You've pretty much answered the question yourself. *thumbs up* For the vast majority of cases you can call any JobManager. The exceptions are jar operations (because they are persisted in the JM-local filesystem, and other JMs don't know about them) and triggering savepoints (because metadata

Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Yuval Itzchakov
Hi, I have a use-case where I need to validate hundreds of Flink SQL queries. Ideally, I'd like to run these validations in parallel. But, given that there's an issue with Calcite and the use of thread-local storage, I can only interact with the table runtime via a single thread. Ideally, I

REST API in an HA setup - must the leading JM be called?

2021-08-18 Thread Juha Mynttinen
I have questions related to REST API in the case of ZooKeeper HA and a standalone cluster. But I think the questions apply to other setups too such as YARN. Let's assume a standalone cluster with multiple JobManagers. The JobManagers elect the leader among themselves and register that to

Re:Re: cumulate函数和比较函数连用报错

2021-08-18 Thread 李航飞
哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗 在 2021-08-18 16:21:20,"Caizhi Weng" 写道: >Hi! > >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。 > >李航飞 于2021年8月18日周三 下午3:55写道: > >>

Re: flinksql的udf中可以使用Operator state的api么?

2021-08-18 Thread Caizhi Weng
Hi! SQL 目前并不支持 stateful udf,你可能需要通过 data stream api 来完成这个需求,详见文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/process_function/ andrew <15021959...@163.com> 于2021年8月17日周二 下午7:04写道: > hi,你好: >

Re: cumulate函数和比较函数连用报错

2021-08-18 Thread Caizhi Weng
Hi! 目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。 李航飞 于2021年8月18日周三 下午3:55写道: > 通过flinksql建立数据处理通道 > SELECT window_start,window_end,SUM(price) > > FROM TABLE( > > CUMULATE(TABLE

cumulate函数和比较函数连用报错

2021-08-18 Thread 李航飞
通过flinksql建立数据处理通道 SELECT window_start,window_end,SUM(price) FROM TABLE( CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' MINUTES)) GROUP BY window_start,window_end; 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题 关键一步是 StatementSet对象 sta.execute()

Re:广告业务中 如何用flink替换spark一些逻辑处理,是否需要用到processfunction

2021-08-18 Thread 东东
这意思是处理乱序吧,如果重试10次都join不上就放弃的意思? flink下面就是双流interval join的事情吧,然后watermark设置要斟酌一下,如果对延迟不敏感就直接30分钟,如果敏感也可以搞分级重试。 纯猜测。 在 2021-08-18 10:25:49,"张锴" 写道: >需求描述: >需要将目前的spark程序替换成flink去做,在梳理逻辑的时候有一块不知道用flink咋实现,spark是按每三分钟一个批次来跑的。 >描述如下: