A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-08 Thread Appleyuchi
代码是: https://paste.ubuntu.com/p/gVGrj2V7ZF/ 报错: A group window expects a time attribute for grouping in a stream environment. 但是代码的数据源中已经有时间属性了. 请问应该怎么修改代码? 谢谢

Re:Re:Re:Re:Flink SQL读取复杂JSON格式

2020-12-08 Thread 破极
刚才搜到了,谢谢 在 2020-12-09 15:20:07,"hailongwang" <18868816...@163.com> 写道: >http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259 >这个邮件列表有相似的问题,你看下有没有帮助。 >PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source >消费不会因为 format

Re:Re:Re:Flink SQL读取复杂JSON格式

2020-12-08 Thread hailongwang
http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259 这个邮件列表有相似的问题,你看下有没有帮助。 PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source 消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。 [1]

Re: Flink HA目录下数据不完整,导致JobManager启动失败。

2020-12-08 Thread 赵一旦
第二个问题原因找到了,为啥ha目录下没有知道了,这个是我傻了,我本身这次更新容器就是更换了ha目录的。 所以问题变为,1个是为什么这种情况会导致JobManager失败,其次2是当前这种case是不是需要删除zk中信息,我看删除还挺复杂,因为zk不支持删除非空目录,我需要一个一个子目录删除嘛是? 赵一旦 于2020年12月9日周三 下午3:07写道: > 基于公司自研的pass平台部署,3个机器,pass自带recover。 > 正常运作中,直接重启pass容器,导致任务失败,等容器重启后,3个机器就都处于类似的无限循环状态。 >

Re: Flink HA目录下数据不完整,导致JobManager启动失败。

2020-12-08 Thread 赵一旦
基于公司自研的pass平台部署,3个机器,pass自带recover。 正常运作中,直接重启pass容器,导致任务失败,等容器重启后,3个机器就都处于类似的无限循环状态。 目前初步分析是因为JobManager启动失败,进而由pass平台自动重启容器,然后无限循环了。

Flink HA目录下数据不完整,导致JobManager启动失败。

2020-12-08 Thread 赵一旦
看日志,JobManager启动后有恢复任务,然后进程失败。 日志如下: 14:55:55.304 [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 14:55:55.305 [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint -

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 Thread xiao cai
Hi Jark sorry,是1.12.0, 我打错了 Original Message Sender: Jark Wu Recipient: user-zh Date: Wednesday, Dec 9, 2020 14:40 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 Thread xiao cai
Hi Jark Original Message Sender: Jark Wu Recipient: user-zh Date: Wednesday, Dec 9, 2020 14:40 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai wrote: >

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 Thread Jark Wu
Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai wrote: > 好的,计划下周升级测试下,另:1.12.1计划何时发布呢 > > > Original Message > Sender: Jark Wu > Recipient: user-zh > Date: Tuesday, Dec 8, 2020 13:41 > Subject: Re:

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 Thread xiao cai
好的,计划下周升级测试下,另:1.12.1计划何时发布呢 Original Message Sender: Jark Wu Recipient: user-zh Date: Tuesday, Dec 8, 2020 13:41 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 hailong 说的定义成 STRING 是在1.12 版本上支持的, https://issues.apache.org/jira/browse/FLINK-18002 1.12 这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue,

Re: Recommendation about RocksDB Metrics ?

2020-12-08 Thread Yun Tang
Hi Kien, >From my point of view, RocksDB native metrics could be classified into 5 parts >below, and you could select what you're interested in to enable. Enable those >metrics could cause about 10% performance regression, and this might impact >the overall performance as not all jobs are

flink sql ??????????????CodeGenException: Unsupported cast from 'ROW' to 'ROW'.

2020-12-08 Thread bigdata
flink1.10.1??error_1006_cnt_permillage sql?? SELECT |DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd') `day`, |UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd

flink sql ??????????????CodeGenException: Unsupported cast from 'ROW' to 'ROW'.

2020-12-08 Thread bigdata
flink1.10.1??error_1006_cnt_permillage sql?? SELECT |DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd') `day`, |UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd

flink??????planner??blink??????????????????cep sql??a->b

2020-12-08 Thread ??????
flink cep sql blinkPATTERN (e1{3 } - e2{1 }?)??

Re:Re:Flink SQL读取复杂JSON格式

2020-12-08 Thread 破极
比如下面这种消息: 第一条消息: {"source":"transaction_2020202020200","data":[{"ip":"127.0.0.1"}]} 第二条消息: {"source":"transaction_2020202020200","data":[{"name":"d"},{"age":18}]} 第三条消息: {"source":"transaction_2020202020200","data":[]} 我想直接在创建表时用一个字段来表示data这个属性的所有值。 在 2020-12-09

Re:Flink SQL读取复杂JSON格式

2020-12-08 Thread Appleyuchi
怎么个动态法? 在 2020-12-09 13:18:56,"破极" 写道: >Hello,各位大佬: >请教下大佬们,在Flink >SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create >table时候schema呢?我定义了array,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 >请问各位大佬有啥高招呢?谢谢。 > > >kafka消息样例(data的value是动态的):

Flink SQL读取复杂JSON格式

2020-12-08 Thread 破极
Hello,各位大佬: 请教下大佬们,在Flink SQL中读取Kafka中的数据,但Kafka中的数据比较复杂,其中json的data属性中的值是数组,但数组的内容是动态的,没有统一,如果定义create table时候schema呢?我定义了array,想自己定义UDF来处理的,在JsonNode中的asText无法取到数据。 请问各位大佬有啥高招呢?谢谢。 kafka消息样例(data的value是动态的): {"source":"transaction_2020202020200","data":[{"name":"d"},{"age":18}]}

A group window expects a time attribute for grouping in a stream environment.THANKS for your help

2020-12-08 Thread Appleyuchi
my code is: https://paste.ubuntu.com/p/gVGrj2V7ZF/ it complains A group window expects a time attribute for grouping in a stream environment. but the data already as time attribute, How to fix it? Thanks for your help.

what's meaning of the "true/false" from "groupy...select"?THANKS

2020-12-08 Thread Appleyuchi
The complete code is: https://paste.ubuntu.com/p/hpWB87kT6P/ The result is: 2> (true,1,diaper,4) 7> (true,3,rubber,2) 4> (true,1,beer,3) 7> (false,3,rubber,2) 7> (true,3,rubber,8) That's the meaning of true/false in the result after running the above code? Thanks for your help~!

关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-08 Thread jindy_liu
场景上: 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。 目前测试了一版本flink sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。

Re: Recommendation about RocksDB Metrics ?

2020-12-08 Thread Steven Wu
just a data point. we actually enabled all RocksDb metrics by default (including very large jobs in terms of parallelism and state size). We didn't see any significant performance impact. There is probably a small impact. At least, it didn't jump out for our workload. On Tue, Dec 8, 2020 at 9:00

flink sql 1.11 kafka cdc??holo sink

2020-12-08 Thread ????
flink sql 1.11 kafka source ?? ??kafka??canal??mysql ??'format' = 'canal-json'?? ?? 1sourcemsyql??schemadata[{}]??table??ts?? 2??topicmysql binlog??kafka

Re: flink11 SQL 如何支持双引号字符串

2020-12-08 Thread zhisheng
是跟这个 Issue 有关吗?https://issues.apache.org/jira/browse/FLINK-20537 赵一旦 于2020年12月9日周三 上午10:17写道: > MARK,学习下。等回复。 > > 莫失莫忘 于2020年12月8日周二 下午6:49写道: > > > 我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中 > > 字符串都是用双引号表示,例如select * from table1 where column1 = > > "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。 >

Re: Flink1.10执行sql超出内存限制被yarn杀掉

2020-12-08 Thread kuailuohe
关闭了RocksDB的内存控制后,是不是应该把taskmanager.memory.managed.size设置成0? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 1.11.2 on yarn报错

2020-12-08 Thread Jacob
*【环境】* Flink 版本:1.11.2 Hadoop 版本 :2.6.0-cdh5.8.3 Java 版本: 1.8.0_144 - *【命令】* [jacob@localhost flink-1.11.2]$ ./bin/yarn-session.sh -jm 1024m -tm 2048m *【现象】* 2020-12-08 18:06:00,134 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli []

Re: flink sql实时计算UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-08 Thread 赵一旦
旧版 'connector.type' = 'jdbc',新版 'connector' = 'jdbc'。 新旧区别,旧版根据查询决定key,新版你只需要定义了key就是upsert了,不需要查询符合一定要求。 Leonard Xu 于2020年12月7日周一 下午5:11写道: > Hi, > 你是不是没有订阅flink的用户邮件列表,所以有些邮件你看不到。 > 你可以发送任意内容的邮件到user-zh-subscr...@flink.apache.org user-zh-subscr...@flink.apache.org>

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 Thread 赵一旦
@JarkWu,你的意思是定义成string,但是输入数据是一个json object也可以支持?这岂不是需要对json部分的反序列化做特殊定制。 比如a字段是string,但数据中a属性是这样的: "a":{ .. }。在java中相当于需要先将a这部分反序列化为一个map,然后再序列化为json(字符串)后作为a这个string的值。 ?是吗。 Jark Wu 于2020年12月8日周二 下午1:42写道: > hailong 说的定义成 STRING 是在1.12 版本上支持的, >

Re: 关于 stream-stream Interval Join 的问题

2020-12-08 Thread Benchao Li
Hi macia, 一旦回答的基本比较完整了。 watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 如果是两侧都有数据,watermark不前进,也都可以正常输出。 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致 你的没有join到的数据下发会延迟很多了。

Re: flink11 SQL 如何支持双引号字符串

2020-12-08 Thread 赵一旦
MARK,学习下。等回复。 莫失莫忘 于2020年12月8日周二 下午6:49写道: > 我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中 > 字符串都是用双引号表示,例如select * from table1 where column1 = > "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。 > ps:我看到flink SQL中字符串都必须用 单引号,例如select * from table1 where column1 = > 'word' 。如何使 字符串 既可以是单引号 也可以是 双引号呢

Re: 在map算子中对redis进行sadd写入之后再sget读取

2020-12-08 Thread 赵一旦
没搞懂你怎么个不得解,是不去除redis的sdk使用还是咋的,问题描述太简单了。 这个貌似就是map内你通过redis client操作redis就好啦呀。 追梦的废柴 于2020年12月8日周二 下午8:44写道: > 各位: > 晚上好! > 对于redis作为数据源或者sink网上有很多参照的案例,那么请问如何在一个map算子里面,先写入set结构中,然后再读取该set的长度呢? > 业务需要,百思不得解,还望各位指点迷津! > 祝好! > > > | | > 追梦的废柴 > | > | > 邮箱:zhuimeng...@163.com > | > > 签名由 网易邮箱大师

Re: 关于 stream-stream Interval Join 的问题

2020-12-08 Thread 赵一旦
重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left join。 (2)此外,还有一个点,这个我也不确认。如果是datastream api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。

回复:怎样定时更新广播变量的值

2020-12-08 Thread 熊云昆
最简单的方式就是自定义Source,里面定时检测配置文件是否有更新,如果有更新的话就发送配置文件的内容,下游算子把这个source发送的内容当做广播变量 | | 熊云昆 | | 邮箱:xiongyun...@163.com | 签名由 网易邮箱大师 定制 在2020年12月08日 17:36,Lei Wang 写道: flink 程序读配置文件,配置文件的内容作为广播变量广播出去。 如果配置文件更新了,怎样能把广播变量的内容也更新呢? 谢谢, 王磊

Re: user defined metrics showed in Flink UI but not datadog

2020-12-08 Thread Chesnay Schepler
https://issues.apache.org/jira/browse/FLINK-20533 There is no workaround in the current Flink releases, but you could compile the reporter based on the PR that I opened. On 12/8/2020 10:38 PM, Fanbin Bu wrote: thank you Chesnay. I did verified that count works with datadog. Please link here

Re: can taskmanager listen on all interfaces, like jobmanager?

2020-12-08 Thread Barisa Obradovic
Great, thank you very much :) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Batch loading into postgres database

2020-12-08 Thread Dylan Forciea
After a bit more playing around with it today, I figured out that what I needed to call was: statementSet.execute().getJobClient().get().getJobExecutionResult(getClass().getClassLoader()).get() The fact that getJobExecutionResult required a classloader is what threw me off. Since I’m using an

Re: user defined metrics showed in Flink UI but not datadog

2020-12-08 Thread Fanbin Bu
thank you Chesnay. I did verified that count works with datadog. Please link here the ticket once you create it. Meanwhile, is there any workaround for now? Fanbin On Tue, Dec 8, 2020 at 2:56 AM Chesnay Schepler wrote: > It appears that the datadog reporter does not report histograms. I'll

Re: relating tumbling windows to each other

2020-12-08 Thread Marco Villalobos
I need to elaborate on my use case. I would like the SQL api to do aggregation for me in an SQL TUMBLING window. But I want the next window to perform business logic on all the records just aggregated in a DataStream ProcessWindowFunction. This would be a mix of SQL and DataStream API. On Tue,

relating tumbling windows to each other

2020-12-08 Thread Marco Villalobos
GIVEN two windows (ProcessWindowFunction), window A, and window B, AND window A is a tumbling processing time window of 15 minutes AND 20 records entered window A, and performs its business logic. How can I assure that Window B will process exactly all the records that left window A within

Re: [External Sender] Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

2020-12-08 Thread Piotr Nowojski
Hi, This exception looks like it was thrown by a downstream Task/TaskManager when trying to read a message/packet from some upstream Task/TaskManager and that connection between two TaskManagers was reseted (closed abruptly). So it's the case: > involves communicating with other non-collocated

Re: can taskmanager listen on all interfaces, like jobmanager?

2020-12-08 Thread Khachatryan Roman
I believe it was solved in 1.11 by FLINK-15911 [1] I tried setting taskmanager.rpc.port to 1 for 1.12 and got tcp6 0 0 :::1:::*LISTEN 13768/java [1] https://issues.apache.org/jira/browse/FLINK-15911 Regards, Roman On Tue, Dec 8, 2020

How User-Defined AggregateFunctions handle deletes of all aggregated rows.

2020-12-08 Thread Rex Fenley
Hello, I'd like to better understand delete behavior of AggregateFunctions. Let's assume there's an aggregate of `user_id` to a set of `group_ids` for groups belonging to that user. `user_id_1 -> [group_id_1, group_id_2, etc.]` Now let's assume sometime later that deletes arrive for all rows

lookup cache clarification

2020-12-08 Thread Marco Villalobos
I set up the following lookup cache values: 'lookup.cache.max-rows' = '20' 'lookup.cache.ttl' = '1min' for a jdbc connector. This table currently only has about 2 records in it. However, since I set the TTL to 1 minute, I expected the job to query that table every minute. The

Flink jobmanager TLS connectivity to Zookeeper

2020-12-08 Thread Azeem Mufti
I'm trying to figure out a way to make Flink jobmanager (in HA) connect to zookeeper over SSL/TLS. It doesn't seem like there are native properties like Kafka has that support this interaction yet. Is this true or is there some way that I can go about doing this?

Re: [External Sender] Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

2020-12-08 Thread Kye Bae
Hello, Piotr. Thank you. This is an error logged to the taskmanager just before it became "lost" to the jobmanager (i.e., reported as "lost" in the jobmanager log just before the job restart). In what context would this particular error (not the root-root cause you referred to) be thrown from a

can taskmanager listen on all interfaces, like jobmanager?

2020-12-08 Thread Barisa Obradovic
I've notice that jobmanager ports all listen on all interfaces by default, as well as data port on the taskmanager. The only exception is the taskmanager RPC port, ``` bash-4.2$ netstat -lpn | grep 612 tcp0 0 172.20.54.176:6121 0.0.0.0:* LISTEN 54/java tcp

Re: 关于 stream-stream Interval Join 的问题

2020-12-08 Thread macia kk
@Benchao Li 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 FLink,可能我的Case 太特殊了. 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要 filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. 还要注意的是 even time 是 create_time, 这里问题非常大: 1. 很多表都有 create

Re: Recommendation about RocksDB Metrics ?

2020-12-08 Thread Khachatryan Roman
Hi Kien, I am pulling in Yun who might know better. Regards, Roman On Sun, Dec 6, 2020 at 3:52 AM Truong Duc Kien wrote: > Hi all, > > We are thinking about enabling RocksDB metrics to better monitor our > pipeline. However, since they will have performance impact, we will have to > be

Re: How long will keyed state exist if no TTL given?

2020-12-08 Thread Marco Villalobos
Thank you for the clarification. On Tue, Dec 8, 2020 at 8:14 AM Khachatryan Roman wrote: > > Hi Marco, > > Yes, if TTL is not configured then the state will never expire (will stay > forever until deleted explicitly). > > Regards, > Roman > > > On Tue, Dec 8, 2020 at 5:09 PM Marco Villalobos

Re: what are the valid dimensions for lookup.cache.ttl?

2020-12-08 Thread Marco Villalobos
Thank you very much! On Tue, Dec 8, 2020 at 8:26 AM Khachatryan Roman wrote: > > Hi Marco, > > You can find the list of the supported time units in TimeUtils javadoc [1]: > DAYS: "d", "day" > HOURS: "h", "hour" > MINUTES: "min", "minute" > SECONDS: "s", "sec", "second" > MILLISECONDS: "ms",

Re: what are the valid dimensions for lookup.cache.ttl?

2020-12-08 Thread Khachatryan Roman
Hi Marco, You can find the list of the supported time units in TimeUtils javadoc [1]: DAYS: "d", "day" HOURS: "h", "hour" MINUTES: "min", "minute" SECONDS: "s", "sec", "second" MILLISECONDS: "ms", "milli", "millisecond" MICROSECONDS: "µs", "micro", "microsecond" NANOSECONDS: "ns", "nano",

Re: How long will keyed state exist if no TTL given?

2020-12-08 Thread Khachatryan Roman
Hi Marco, Yes, if TTL is not configured then the state will never expire (will stay forever until deleted explicitly). Regards, Roman On Tue, Dec 8, 2020 at 5:09 PM Marco Villalobos wrote: > After reading > > >

Re: Event time issues when Kinesis consumer receives batched messages

2020-12-08 Thread Khachatryan Roman
Thanks, Randal, Yes, I think the only way is to partition the stream the same way as kinesis does (as I wrote before). Regards, Roman On Tue, Dec 8, 2020 at 1:38 PM Randal Pitt wrote: > Hi Roman, > > We're using a custom watermarker that uses a histogram to calculate a "best > fit" event

How long will keyed state exist if no TTL given?

2020-12-08 Thread Marco Villalobos
After reading https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html It is unclear to me how long keyed state will exist if it has no TTL. Is it cached forever, unless explicitly cleared or overwritten? can somebody please explain to me? Thank you.

Re: Application Mode support on VVP v2.3

2020-12-08 Thread Fabian Paul
Hi Narasimha, I investigated your problem and it is caused by multiple issues. First vvp in general cannot really handle multi job submissions per jar because the complete deployment lifecycle in vvp is scoped around a single Flink job id. Therefore vvp sets a generated Flink job id during

How can I optimize joins or cache misses in SQL api?

2020-12-08 Thread Marco Villalobos
scenario: kafka stream enriched with tableS in postgresql Let's pretend that the postgres has an organizations, departments, and persons table, and we want to join the full name of the kafka table that has the person id. I also want to determine if the person id is missing. This requires a

what are the valid dimensions for lookup.cache.ttl?

2020-12-08 Thread Marco Villalobos
In https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.html there no allowable dimensions specified for the lookup.cache.ttl. Can somebody please provide a list of valid values and their meaning? I know 's' for seconds is supported. How do I specify minutes?

在map算子中对redis进行sadd写入之后再sget读取

2020-12-08 Thread 追梦的废柴
各位: 晚上好! 对于redis作为数据源或者sink网上有很多参照的案例,那么请问如何在一个map算子里面,先写入set结构中,然后再读取该set的长度呢? 业务需要,百思不得解,还望各位指点迷津! 祝好! | | 追梦的废柴 | | 邮箱:zhuimeng...@163.com | 签名由 网易邮箱大师 定制

AW: Flink UDF registration from jar at runtime

2020-12-08 Thread Jakub N
Hi Guowei, 1. Unfortunately the UDF and the job are not in the same fatjar. Essentially there is only one "fatjar" containing the Flink environment + the job, the UDF is separate. 2. Yes, that is correct. 3. As explained in 1. I don't submit job jars to the Flink environment,

Re: Event time issues when Kinesis consumer receives batched messages

2020-12-08 Thread Randal Pitt
Hi Roman, We're using a custom watermarker that uses a histogram to calculate a "best fit" event time as the data we receive can be very unordered. As you can see we're using the timestamp from the first event in the batch, so we're essentially sampling the timestamps rather than using them all.

Re: Event time issues when Kinesis consumer receives batched messages

2020-12-08 Thread Khachatryan Roman
Hi Randal, Can you share the code for the 1st approach (FlinkKinesisConsumer.setPeriodicWatermarkAssigner))? I think the 2nd approach (flatMap) can be improved by partitioning the stream the same way kinesis does (i.e. same partition key). Regards, Roman On Mon, Dec 7, 2020 at 2:44 PM Randal

flink sql ddl????????????java.lang.IncompatibleClassChangeError: Implementing class

2020-12-08 Thread bigdata
??flink1.10.1,pom??

Re: Urgent help on S3 CSV file reader DataStream Job

2020-12-08 Thread Wei Zhong
Hi Deep, It seems that the TypeInformation array in your code has 2 elements, but we only need one here. This approach treats the entire csv file as a Row which has only a one column, so there should be only one `BasicTypeInfo.STRING_TYPE_INFO` in the array. And if you use the TextInputFormat

Re:关于flink sql往postgres写数据遇到的timestamp问题

2020-12-08 Thread 李轲
报错信息: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.api.TableException: Unsupported conversion

关于flink sql往postgres写数据遇到的timestamp问题

2020-12-08 Thread 李轲
项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法 在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义 select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE); 有没有什么转换方法?或者只插入部分数据的方法?

Re: how to register TableAggregateFunction?

2020-12-08 Thread Xingbo Huang
Hi, As far as I know, TableAggregateFunction is not supported yet in batch mode[1]. You can try to use it in stream mode. [1] https://issues.apache.org/jira/browse/FLINK-10978 Best, Xingbo Leonard Xu 于2020年12月8日周二 下午6:05写道: > Hi, appleyuchi > > Sorry for the late reply, > but could you

Re: user defined metrics showed in Flink UI but not datadog

2020-12-08 Thread Chesnay Schepler
It appears that the datadog reporter does not report histograms. I'll file an issue to fix that. On 12/8/2020 4:42 AM, Fanbin Bu wrote: Hi, I followed [1] to define my own metric as: val dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500)) histogram =

flink11 SQL ????????????????????

2020-12-08 Thread ????????
hive sql ?? flink hive sql ?? select * from table1 where column1 = "word"SQLflink SQL ?? psflink SQL select * from table1 where column1 = 'word' ??

Re: Flink 1.11版本LeaseRenewer线程不释放

2020-12-08 Thread zilong xiao
作业数据流是 kafka -> flink -> http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 我再debug看看~ Paul Lam 于2020年12月8日周二 下午6:00写道: > Hi, > > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。 > > Best, > Paul Lam > > > 2020年12月8日 11:03,zilong xiao 写道: > > > > Hi Paul, > >

Re: Python UDF filter problem

2020-12-08 Thread László Ciople
Awesome, thanks! On Tue, Dec 8, 2020 at 11:55 AM Xingbo Huang wrote: > Hi, > > This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3 > and release-1.12.0 have not been released yet (VOTE has been passed). I run > your job in release-1.12, and the plan is correct. > > > [1]

Re: flink sql 任务滑动窗口失效

2020-12-08 Thread xushanshan
-- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: FlinkSQL中创建表,视图等一些元数据信息都是存放在什么地方(没看到像Hive那样使用mysql存储元数据信息)

2020-12-08 Thread 邮件帮助中心
感谢您的答复!! 在 2020-12-08 15:57:32,"Leonard Xu" 写道: >Hi, >Flink 的元数据存放在catalog中的,也支持多种catalog(embedded, >HIve,JDBC,自定义catalog),默认Flink使用内置的GenericInMemoryCatalog,这个是in >memory的catalog,元数据都在这里,生产环境上可以使用HiveCatalog > > >祝好 >Leonard >[1]

Re: how to register TableAggregateFunction?

2020-12-08 Thread Leonard Xu
Hi, appleyuchi Sorry for the late reply, but could you describe you problem more or post your exception stack? The doc you posted has contained the section to define and register function. And I suggest you post your entire code in email directly that can reproduce the problem, thus the

Re: Flink 1.11版本LeaseRenewer线程不释放

2020-12-08 Thread Paul Lam
Hi, 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。 Best, Paul Lam > 2020年12月8日 11:03,zilong xiao 写道: > > Hi Paul, >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink > 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root >

Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-08 Thread bradyMk
好的,谢谢大佬解答~ - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Python UDF filter problem

2020-12-08 Thread Xingbo Huang
Hi, This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3 and release-1.12.0 have not been released yet (VOTE has been passed). I run your job in release-1.12, and the plan is correct. [1] https://issues.apache.org/jira/browse/FLINK-19675 Best, Xingbo László Ciople

flink sql ddl????????????java.lang.IncompatibleClassChangeError: Implementing class

2020-12-08 Thread bigdata
??flink1.10.1,pom??

求助如何用flink1.11.2 on yarn集成CDH的hbase2.0版本

2020-12-08 Thread site
根据官方提供的方法,用HADOOP_CLASSPATH=`hadoop classpath`集成hadoop成功。 因为flink on yarn是用的cdh6集群,所以我想利用现有的classpath中的包含的hbase库,使用 export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/*:$HADOOP_CLASSPATH 然后用yarn-session创建一个flink运行环境,再用sql-client连接这个容器,创建hbase映射表,这种用法失败:分析提示是找不到hbase包。

怎样定时更新广播变量的值

2020-12-08 Thread Lei Wang
flink 程序读配置文件,配置文件的内容作为广播变量广播出去。 如果配置文件更新了,怎样能把广播变量的内容也更新呢? 谢谢, 王磊

Python UDF filter problem

2020-12-08 Thread László Ciople
Hello, I am trying to use Flink v1.11.2 with Python and the Table API to read and write back messages to kafka topics. I am trying to filter messages based on the output of a udf which returns a boolean. It seems that Flink ignores the WHERE clause in my queries and every input message is received

Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

2020-12-08 Thread Piotr Nowojski
Hi Kye, Almost for sure this error is not the primary cause of the failure. This error means that the node reporting it, has detected some fatal failure on the other side of the wire (connection reset by peer), but the original error is somehow too slow or unable to propagate to the JobManager