Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Gowri Sundaram
Hi Congxian, Thank you so much for your response! We will go ahead and do a POC to test out how Flink performs at scale. Regards, - Gowri On Wed, May 6, 2020 at 8:34 AM Congxian Qiu wrote: > Hi > > From my experience, you should care the state size for a single task(not > the whole job state

Autoscaling vs backpressure

2020-05-05 Thread Manish G
Hi, As flink doesn't provide out-of-box support for autoscaling, can backpressure be considered as an alternative to it? Autoscaling allows us to add/remove nodes as load goes up/down. With backpressure, if load goes up system would signal upstream to release data slowly. So we don't need to add

Re: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 Thread Jingsong Li
Hi, > sink 表中没有任何主键或唯一键 这个时候更合理的方式应该是抛出异常,不过实现上可能有些不好搞 > 回撤导致的结果变成 0 ,就会执行 delete , 否则就是update 你理解的完全正确 Best Jingsong Lee On Wed, May 6, 2020 at 12:39 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > Thanks Jingsong Lee. > > 我用的是 MySQL,sink 表中没有任何主键或唯一键. > 如果 sink

Re: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 Thread wangl...@geekplus.com.cn
Thanks Jingsong Lee. 我用的是 MySQL,sink 表中没有任何主键或唯一键. 如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。 我把 flink sql-client 客户端设置 SET execution.result-mode=changelog 试验了下,左边标上了是第几条 kafka 消息导致的行为: +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 3 - yuantong 1 3 + yuantong 2 4 -

Re: flink sql 处理时间 时区问题

2020-05-05 Thread Jark Wu
Yes. This is the same problem with CURRENT_TIMESTAMP which was asked before in mailing list and JIRA. Changing the return type to WITH LOCAL TIME ZONE is not a small work, we should make event-time and watermark support this type. But I think this is in a high priority and should be fixed in the

??????What is the RocksDB local directory in flink checkpointing?

2020-05-05 Thread ????
Hi LakeShen, Default use yarn's tmp dir. Best, fanrui ---- ??:"LakeShen"

What is the RocksDB local directory in flink checkpointing?

2020-05-05 Thread LakeShen
Hi community, Now I have a question about flink checkpoint local directory , our flink version is 1.6, job mode is flink on yarn per job . I saw the flink source code , and I find the flink checkpoint local directory is /tmp when you didn't config the "state.backend.rocksdb.localdir". But I go

What is the RocksDB local directory in flink checkpointing?

2020-05-05 Thread LakeShen
Hi community, Now I have a question about flink checkpoint local directory , our flink version is 1.6, job mode is flink on yarn per job . I saw the flink source code , and I find the flink checkpoint local directory is /tmp when you didn't config the "state.backend.rocksdb.localdir". But I go

Re:回复: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 Thread Michael Ran
1.flink 状态或者内存维护了所有结果。 2.当group by count 结果值(tms_company=1),新来一条记录变成(tms_company=2) tms_company=1 (旧,false) tms_company=2 (新,true) 3. 内存里面就会把旧的舍弃掉,用新的参与后续计算 4.如果存储(mysql 之类的),会生成对应的SQL 进行更新掉 在 2020-05-06 10:36:35,"wangl...@geekplus.com.cn" 写道: > >更新键是 tms_company, 但这是通过双层的

Re: Re:FlinkSQL Retraction 问题原理咨询

2020-05-05 Thread Jingsong Li
Hi, 问题一:删除数据可不单单只是retract stream的功能。upsert stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert stream也有retract的input数据的。JDBC实现的是upsert stream的消费。 问题二:正确数据应该是: 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1 2 {"order_id":1,"tms_company":"yuantong"}

Re: Flink监控: promethues获取到有的metrics没有包含flink 对应的job_name或者job_id

2020-05-05 Thread Yun Tang
Hi flink_jobmanager_Status 这种metrics属于jobmanager层级的metrics,这种metrics与job level的metrics,从概念上来说是不一样的。因为Flink是支持一个JM里面同时运行多个作业的,但是JM的JVM实际上只有一个,所以如果给JM的metrics增加其从属的job_id 的tag是不符合语义的。当然,如果一个host上有多个JM,现在Flink不太好区分,目前只有TM级别的tm_id可以区分不同的TM。 如果非要加上job_name 或者 job_id

Re: Flink: For terabytes of keyed state.

2020-05-05 Thread Congxian Qiu
Hi >From my experience, you should care the state size for a single task(not the whole job state size), the download speed for single thread is almost 100 MB/s (this may vary in different env), and I do not have much performance for loading state into RocksDB(we use an internal KV store in my

Re: multiple joins in one job

2020-05-05 Thread Benchao Li
Yes. The watermark will be propagated correctly, which is the min of two inputs. lec ssmi 于2020年5月6日周三 上午9:46写道: > Even if the time attribute field is retained, will the related watermark > be retained? > If not, and there is no sql syntax to declare watermark again, it is > equivalent to not

Re: flink sql 处理时间 时区问题

2020-05-05 Thread Jingsong Li
Hi, 这可能是个Bug。 Blink中默认使用timestamp WITHOUT time zone,所以它是无时区的。 而proctime目前还是带时区的产生了时间,我理解可能是应该产生无时区的时间。 CC: @Jark Wu @Zhenghua Gao Best, Jingsong Lee On Tue, May 5, 2020 at 5:43 PM 祝尚 <17626017...@163.com> wrote: > 同问,等待大佬回答 > > > 2020年5月1日 下午5:26,hb <343122...@163.com> 写道: > > > > > > > >

Re: multiple joins in one job

2020-05-05 Thread lec ssmi
Even if the time attribute field is retained, will the related watermark be retained? If not, and there is no sql syntax to declare watermark again, it is equivalent to not being able to do multiple joins in one job. Benchao Li 于2020年5月5日周二 下午9:23写道: > You cannot select more than one time

Re: Writing _SUCCESS Files (Streaming and Batch)

2020-05-05 Thread Jingsong Li
Hi Peter, The troublesome is how to know the "ending" for a bucket in streaming job. In 1.11, we are trying to implement a watermark-related bucket ending mechanism[1] in Table/SQL. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table Best, Jingsong

Re: Cannot start native K8s

2020-05-05 Thread Yang Wang
Hi Dongwon Kim, I think it is a known issue. The native kubernetes integration could not work with jdk 8u252 due to okhttp issue[1]. Currently, you could upgrade your jdk to a new version to work around. [1]. https://issues.apache.org/jira/browse/FLINK-17416 Dongwon Kim 于2020年5月6日周三 上午7:15写道:

Export user metrics with Flink Prometheus endpoint

2020-05-05 Thread Eleanore Jin
Hi all, I just wonder is it possible to use Flink Metrics endpoint to allow Prometheus to scrape user defined metrics? Context: In addition to Flink metrics, we also collect some application level metrics using opencensus. And we run opencensus agent as side car in kubernetes pod to collect

Export user metrics with Flink Prometheus endpoint

2020-05-05 Thread Eleanore Jin
Hi all, I just wonder is it possible to use Flink Metrics endpoint to allow Prometheus to scrape user defined metrics? Context: In addition to Flink metrics, we also collect some application level metrics using opencensus. And we run opencensus agent as side car in kubernetes pod to collect

Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

2020-05-05 Thread Nick Bendtner
Hi guys, In our flink job we use java source for deserializing a message from kafka using a kafka deserializer. Signature is as below. public class CustomAvroDeserializationSchema implements KafkaDeserializationSchema> The other parts of the streaming job are in scala. When data has to

MongoDB as a Sink;

2020-05-05 Thread Aissa Elaffani
Hello Guys, I am looking for some help concerning my flink sink, i want te output to be stocked in MongoDB database. As far as I know, there is no sink conector for MongoDB, and I need to implement one by my self, and i don't know how to do that. Can you please help me in this ?

Cannot start native K8s

2020-05-05 Thread Dongwon Kim
Hi, I'm using Flink-1.10 and tested everything [1] successfully. While trying [2], I got the following message. Can anyone help please? [root@DAC-E04-W06 bin]# ./kubernetes-session.sh > 2020-05-06 08:10:49,411 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading >

Flink pipeline;

2020-05-05 Thread Aissa Elaffani
Hello Guys, I am new to the real-time streaming field, and I am trying to build a BIG DATA architecture for processing real-time streaming. I have some sensors that generate data in json format, they are sent to Apache kafka cluster then i want to consume them with Apache flinkin ordre to do some

Overriding hadoop core-site.xml keys using the flink-fs-hadoop-shaded assemblies

2020-05-05 Thread Jeff Henrikson
Has anyone had success overriding hadoop core-site.xml keys using the flink-fs-hadoop-shaded assemblies? If so, what versions were known to work? Using btrace, I am seeing a bug in the hadoop shaded dependencies distributed with 1.10.0. Some (but not all) of the core-site.xml keys cannot be

Flink - Hadoop Connectivity - Unable to read file

2020-05-05 Thread Samik Mukherjee
Hi All, I am trying to get some file from HDFS which is locally installed. But I am not able to. I tried with both these ways. But all the time the program is ending with "Process finished with exit code 239." Any help will be helpful- public class Processor { public static void

Re: table.show() in Flink

2020-05-05 Thread Fabian Hueske
There's also the Table API approach if you want to avoid typing a "full" SQL query: Table t = tEnv.from("myTable"); Cheers, Fabian Am Di., 5. Mai 2020 um 16:34 Uhr schrieb Őrhidi Mátyás < matyas.orh...@gmail.com>: > Thanks guys for the prompt answers! > > On Tue, May 5, 2020 at 2:49 PM Kurt

Re: table.show() in Flink

2020-05-05 Thread Őrhidi Mátyás
Thanks guys for the prompt answers! On Tue, May 5, 2020 at 2:49 PM Kurt Young wrote: > A more straightforward way after FLIP-84 would be: > TableResult result = tEnv.executeSql("select xxx ..."); > result.print(); > > And if you are using 1.10 now, you can use TableUtils#collectToList(table) >

Re: multiple joins in one job

2020-05-05 Thread Benchao Li
You cannot select more than one time attribute, the planner will give you an Exception if you did that. lec ssmi 于2020年5月5日周二 下午8:34写道: > As you said, if I select all the time attribute fields from > both , which will be the final one? > > Benchao Li 于 2020年5月5日周二 17:26写道: >

Re: Restore from savepoint with Iterations

2020-05-05 Thread ashish pok
Let me see if I can do artificial throttle somewhere. Volume of data is really high and hence trying to avoid rounds in Kafka too. Looks like options are “not so elegant” until FLIP-15. Thanks for pointers again!!! On Monday, May 4, 2020, 11:06 PM, Ken Krugler wrote: Hi Ashish, The

Re: table.show() in Flink

2020-05-05 Thread Kurt Young
A more straightforward way after FLIP-84 would be: TableResult result = tEnv.executeSql("select xxx ..."); result.print(); And if you are using 1.10 now, you can use TableUtils#collectToList(table) to collect the result to a list, and then print rows by yourself. Best, Kurt On Tue, May 5, 2020

Re: table.show() in Flink

2020-05-05 Thread Jark Wu
Hi Matyas, AFAIK, currently, this is the recommended way to print result of table. In FLIP-84 [1] , which is targeted to 1.11, we will introduce some new APIs to do the fluent printing like this. Table table2 = tEnv.sqlQuery("select yy ..."); TableResult result2 = table2.execute();

Re: Reading from sockets using dataset api

2020-05-05 Thread Arvid Heise
Hi Kaan, explicitly mapping to physical nodes is currently not supported and would need some workarounds. I have readded user mailing list (please always also include it in response); maybe someone can help you with that. Best, Arvid On Thu, Apr 30, 2020 at 10:12 AM Kaan Sancak wrote: > One

Re: multiple joins in one job

2020-05-05 Thread lec ssmi
As you said, if I select all the time attribute fields from both , which will be the final one? Benchao Li 于 2020年5月5日周二 17:26写道: > Hi lec, > > You don't need to specify time attribute again like `TUMBLE_ROWTIME`, you > just select the time attribute field > from one of the

table.show() in Flink

2020-05-05 Thread Őrhidi Mátyás
Dear Flink Community, I'm missing Spark's table.show() method in Flink. I'm using the following alternative at the moment: Table results = tableEnv.sqlQuery("SELECT * FROM my_table"); tableEnv.toAppendStream(results, Row.class).print(); Is it the recommended way to print the content of a table?

Re: flink sql 处理时间 时区问题

2020-05-05 Thread 祝尚
同问,等待大佬回答 > 2020年5月1日 下午5:26,hb <343122...@163.com> 写道: > > > > ``` 代码 > val env = StreamExecutionEnvironment.getExecutionEnvironment > val settings: EnvironmentSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val tEnv: StreamTableEnvironment =

Flink on Kubernetes unable to Recover from failure

2020-05-05 Thread Morgan Geldenhuys
Community, I am currently doing some fault tolerance testing for Flink (1.10) running on Kubernetes (1.18) and am encountering an error where after a running job experiences a failure, the job fails completely. A Flink session cluster has been created according to the documentation

Re: ML/DL via Flink

2020-05-05 Thread m@xi
Hello Becket, I just watched your Flink Forward talk. Really interesting! I leave the link here as it is related to the post. AI Flow (FF20) Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: multiple joins in one job

2020-05-05 Thread Benchao Li
Hi lec, You don't need to specify time attribute again like `TUMBLE_ROWTIME`, you just select the time attribute field from one of the input, then it will be time attribute automatically. lec ssmi 于2020年5月5日周二 下午4:42写道: > But I have not found there is any syntax to specify time >

Re: Autoscaling flink application

2020-05-05 Thread Manish G
Thanks. It would help. On Tue, May 5, 2020 at 2:12 PM David Anderson wrote: > There's no explicit, out-of-the-box support for autoscaling, but it can be > done. > > Autoscaling came up a few times at the recent Virtual Flink Forward, > including a talk on Autoscaling Flink at Netflix [1] by

Re: Autoscaling flink application

2020-05-05 Thread David Anderson
There's no explicit, out-of-the-box support for autoscaling, but it can be done. Autoscaling came up a few times at the recent Virtual Flink Forward, including a talk on Autoscaling Flink at Netflix [1] by Timothy Farkas. [1] https://www.youtube.com/watch?v=NV0jvA5ZDNc Regards, David On Mon,

Re: multiple joins in one job

2020-05-05 Thread lec ssmi
But I have not found there is any syntax to specify time attribute field and watermark again with pure sql. Fabian Hueske 于 2020年5月5日周二 15:47写道: > Sure, you can write a SQL query with multiple interval joins that preserve > event-time attributes and watermarks. > There's no

Re: multiple joins in one job

2020-05-05 Thread Fabian Hueske
Sure, you can write a SQL query with multiple interval joins that preserve event-time attributes and watermarks. There's no need to feed data back to Kafka just to inject it again to assign new watermarks. Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi : > I mean using pure sql statement to

Re: Broadcast stream causing GC overhead limit exceeded

2020-05-05 Thread Fabian Hueske
Hi Eleanore, The "GC overhead limit exceeded" error shows that the JVM spends way too much time garbage collecting and only recovers little memory with every run. Since, the program doesn't make any progress in such a situation it is terminated with the GC Overhead Error. This typically happens