Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-10 Thread Yangze Guo
Thanks for the reply, Andrey. Regarding building from local dist: - Yes, I bring this up mostly for development purpose. Since k8s is popular, I believe more and more developers would like to test their work on k8s cluster. I'm not sure should all developers write a custom docker file themselves

Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread Kurt Young
我在社区建了个issue:https://issues.apache.org/jira/browse/FLINK-16534 后续你可以关注下 Best, Kurt On Wed, Mar 11, 2020 at 12:54 PM Kurt Young wrote: > sql client 目前还不支持这个功能。 > > Best, > Kurt > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > >> Hi Kurt, >>

Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread Kurt Young
sql client 目前还不支持这个功能。 Best, Kurt On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > Hi Kurt, > 确实是可以 直接 flink cancel -s 保存状态。 > 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? > > 谢谢, > 王磊 > > > *Sender:* Kurt Young > *Send

Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Xintong Song
Hi Eleanore, That does't sound like a scaling issue. It's probably a data skew, that the data volume on some of the keys are significantly higher than others. I'm not familiar with this area though, and have copied Jark for you, who is one of the community experts in this area. Thank you~

Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Xintong Song
Hi Eleanore, That does't sound like a scaling issue. It's probably a data skew, that the data volume on some of the keys are significantly higher than others. I'm not familiar with this area though, and have copied Jark for you, who is one of the community experts in this area. Thank you~

Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Kurt, 确实是可以 直接 flink cancel -s 保存状态。 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? 谢谢, 王磊 Sender: Kurt Young Send Time: 2020-03-11 10:38 Receiver: user-zh Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 理论上来说,flink

Re: Question on the SQL "GROUPING SETS" and "CUBE" syntax usability

2020-03-10 Thread Jark Wu
Thanks Arvid for reminding me this topic. Actually, it is supported in streaming mode in blink planner (since Flink v1.9), but we missed to update the documentation. You can also find it is supported in the integration tests [1]. I created an issue to update docs [2]. Best, Jark [1]:

Re: Re: Kafka sink only support append mode?

2020-03-10 Thread Jark Wu
Hi Lei, Are you trying a regular left join query? Non-time-based operators (e.g. regular join in your case) will emit result when input is not complete, the result will be updated when more inputs come in (by emitting upsert/retract messages). But time-based operators (e.g. windowed aggregate,

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread Jark Wu
Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. By using DDL, a registered table can both be used as source and sink. Best, Jark [1]:

Re: flink HiveTableSink 何时支持 streaming 模式写入

2020-03-10 Thread Kurt Young
预计1.11会ready。 Best, Kurt On Wed, Mar 11, 2020 at 10:44 AM chenkaibit wrote: > Hi: > 我看 https://issues.apache.org/jira/browse/FLINK-14255 引入了 一个 > FileSystemStreamingSink,貌似是为 HiveTableSink 支持 streaming > 模式写入做准备,这个功能预计会在后续哪个版本正式发布呢? > >

Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-10 Thread Kurt Young
Hi Jiawai, Sorry I still didn't fully get your question. What's wrong with your proposed SQL? > select vendorId, sum(inventory units) > from dynamodb > where today's time - inbound time > 15 > group by vendorId My guess is that such query would only trigger calculations by new event. So if a

Re: Setting app Flink logger

2020-03-10 Thread Yang Wang
Since you are using log4j2, the java dynamic property should not be "log4j.configuration". Please use "log4j.configurationFile" instead. Maybe it is not your problem, there is something wrong with the docker image. The log4j2 properties in "flink-console.sh" are not configured correctly. Best,

flink HiveTableSink 何时支持 streaming 模式写入

2020-03-10 Thread chenkaibit
Hi: 我看 https://issues.apache.org/jira/browse/FLINK-14255 引入了 一个 FileSystemStreamingSink,貌似是为 HiveTableSink 支持 streaming 模式写入做准备,这个功能预计会在后续哪个版本正式发布呢?

Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread Kurt Young
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 Best, Kurt On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 有两个表: > tableA: key valueA > tableB: key valueB > > 我之前用 flink state

Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
_Hi Xintong, Thanks for the prompt reply! To answer your question: - Which Flink version are you using? v1.8.2 - Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value? I also tried this, it

Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
_Hi Xintong, Thanks for the prompt reply! To answer your question: - Which Flink version are you using? v1.8.2 - Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value? I also tried this, it

flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread wangl...@geekplus.com.cn
有两个表: tableA: key valueA tableB: key valueB 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? 谢谢, 王磊

Re: Automatically Clearing Temporary Directories

2020-03-10 Thread Yang Wang
Hi David, Currently, the TaskManager could cleanup the non-referenced files in blob cache. It could configured via `blob.service.cleanup.interval`[1]. Also when the TaskManager is shut down gracefully, the storage directory will be deleted. So do you stop your TaskManager forcibly(i.e. kill -9)?

Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-10 Thread Jiawei Wu
Hi Robert, Your answer really helps. About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as

Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Xintong Song
Hi Eleanore, I have a few more questions regarding your issue. - Which Flink version are you using? - Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value? - Keeping the job running a while after the scale-up, does the

scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
Hi Experts, I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to

scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
Hi Experts, I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to

Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-10 Thread Thomas Weise
Thanks for working on improvements to the Flink Docker container images. This will be important as more and more users are looking to adopt Kubernetes and other deployment tooling that relies on Docker images. A generic, dynamic configuration mechanism based on environment variables is essential

Re: Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi Arvid, Thank you for the clarification! Best, Eleanore On Tue, Mar 10, 2020 at 12:32 PM Arvid Heise wrote: > Hi Eleanore, > > incremental checkpointing would be needed if you have a large state > (GB-TB), but between two checkpoints only little changes happen (KB-MB). > > There are two

Re: Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi Arvid, Thank you for the clarification! Best, Eleanore On Tue, Mar 10, 2020 at 12:32 PM Arvid Heise wrote: > Hi Eleanore, > > incremental checkpointing would be needed if you have a large state > (GB-TB), but between two checkpoints only little changes happen (KB-MB). > > There are two

Re: Is incremental checkpoints needed?

2020-03-10 Thread Arvid Heise
Hi Eleanore, incremental checkpointing would be needed if you have a large state (GB-TB), but between two checkpoints only little changes happen (KB-MB). There are two reasons for large state: large user state or large operator state coming from joins, windows, or grouping. In the end, you will

Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-10 Thread Andrey Zagrebin
Hi All, Thanks a lot for the feedback! *@Yangze Guo* - Regarding the flink_docker_utils#install_flink function, I think it > should also support build from local dist and build from a > user-defined archive. I suppose you bring this up mostly for development purpose or powerful users. Most of

time-windowed joins and tumbling windows

2020-03-10 Thread Vinod Mehra
Hi! We are testing the following 3 way time windowed join to keep the retained state size small. Using joins for the first time here. It works in unit tests but we are not able to get expected results in production. We are still troubleshooting this issue. Can you please help us review this in

Automatically Clearing Temporary Directories

2020-03-10 Thread David Maddison
Hi, When a TaskManager is restarted it can leave behind unreferenced BlobServer cache directories in the temporary storage that never get cleaned up. Would it be safe to automatically clear the temporary storage every time when a TaskManager is started? (Note: the temporary volumes in use are

Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi All, I am using Apache Beam to construct the pipeline, and this pipeline is running with Flink Runner. Both Source and Sink are Kafka topics, I have enabled Beam Exactly once semantics. I believe how it works in beam is: the messages will be cached and not processed by the

Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi All, I am using Apache Beam to construct the pipeline, and this pipeline is running with Flink Runner. Both Source and Sink are Kafka topics, I have enabled Beam Exactly once semantics. I believe how it works in beam is: the messages will be cached and not processed by the

Re: org.apache.flink.table.planner.PlanningConfigurationBuilder.java

2020-03-10 Thread tison
这个文件是编译时生成的,请在根目录下运行 mvn package Best, tison. jaslou 于2020年3月10日周二 下午11:15写道: > Hi, > > > 在编译源码的时候发现flink-table-parnner模块的org.apache.flink.table.planner.PlanningConfigurationBuilder.java类报错, > 找不到 >

Re: Setting app Flink logger

2020-03-10 Thread miki haiat
Which image are you using ? On Tue, Mar 10, 2020, 16:27 Eyal Pe'er wrote: > Hi Rafi, > > The file exists (and is the file from the official imageJ, please see > below). > > The user is root and it has permission. I am running in HA mode using > docker. > > > > cat

org.apache.flink.table.planner.PlanningConfigurationBuilder.java

2020-03-10 Thread jaslou
Hi, 在编译源码的时候发现flink-table-parnner模块的org.apache.flink.table.planner.PlanningConfigurationBuilder.java类报错, 找不到org.apache.flink.sql.parser.impl.FlinkSqlParserImpl文件,发现flink-sql-parser模块下没有impl这个package以及FlinkSqlParserImpl文件 version:release-1.10.0 Best, Jaslou

RE: Setting app Flink logger

2020-03-10 Thread Eyal Pe'er
Hi Rafi, The file exists (and is the file from the official image☺, please see below). The user is root and it has permission. I am running in HA mode using docker. cat /opt/flink/conf/log4j-console.properties #

Re: History server UI not working

2020-03-10 Thread Yadong Xie
Hi pwestermann I believe this is related to https://issues.apache.org/jira/browse/FLINK-13799 It seems that the configuration.features['web-submit'] is missed from the api when you upgrading from 1.7 to 1.9.2 Do you have the same problem when upgrading to 1.10? feel free to ping me if you still

Re: Setting app Flink logger

2020-03-10 Thread Rafi Aroch
Hi Eyal, Sounds trivial, but can you verify that the file actually exists in /opt/flink/conf/log4j-console.properties? Also, verify that the user running the process has read permissions to that file. You said you use Flink in YARN mode, but the the example above you run inside a docker image so

Failure detection and Heartbeats

2020-03-10 Thread Morgan Geldenhuys
Hi community, I am interested in knowing more about the failure detection mechanism used by Flink, unfortunately information is a little thin on the ground and I was hoping someone could shed a little light on the topic. Looking at the documentation

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-client support

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread David Anderson
Watermarks are a tool for handling out-of-orderness when working with event time timestamps. They provide a mechanism for managing the tradeoff between latency and completeness, allowing you to manage how long to wait for any out-of-orderness to resolve itself. Note the way that Flink uses these

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread Aljoscha Krettek
On 10.03.20 10:13, kant kodali wrote: If ingestion time programs cannot handle late data then why would it generate watermarks? Isn't the whole point of watermarks is to handle the late data? Watermarks are not only used for handling late data. Watermarks are the mechanism that is used to

Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread Arvid Heise
Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka

Re: RocksDB

2020-03-10 Thread David Anderson
The State Processor API goes a bit in the direction you asking about, by making it possible to query savepoints. https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Re: RocksDB

2020-03-10 Thread Aljoscha Krettek
On 10.03.20 11:36, Timothy Victor wrote: Can the RocksDB state backend used by Flink be queries from outside, e.g. via SQL? That's not possible, but you might be interested in queryable state: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html Or

Setting app Flink logger

2020-03-10 Thread Eyal Pe'er
Hi, I am running Flink in YARN mode using the official image with few additional files. I've noticed that my logger failed to initialize: root:~# docker logs flink-task-manager Starting taskexecutor as a console application on host ***. log4j:WARN No appenders could be found for logger

回复: Hive Source With Kerberos认证问题

2020-03-10 Thread 叶贤勋
在doAs方法中是可以的。我现在hive connector中操作hive涉及认证的代码都在doAs中执行,可以解决认证问题。 前面提到的stacktrace是用我们公司自己封装的hive-exec jar打印出来的,所以跟源码对应不上,我用官网的hive-exec-2.1.1.jar也是有这个问题。 | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制 在2020年03月5日 13:52,Rui Li 写道:

Re: Re: Kafka sink only support append mode?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Jark, Thanks for the explanation. The group by statement will result a not append stream. I have just tried a join statement and want to send the result to kafka, it also has the error: AppendStreamTableSink requires that Table has only insert changes Why the join result is not

RocksDB

2020-03-10 Thread Timothy Victor
Can the RocksDB state backend used by Flink be queries from outside, e.g. via SQL? Or maybe a better question, is there a RocksDB SinkFunction that exists? Thanks Tim

Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector:

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread kant kodali
Hi Arvid, If ingestion time programs cannot handle late data then why would it generate watermarks? Isn't the whole point of watermarks is to handle the late data? My last question was more about this library I run several algorithms using

join key 有重复的双流 join 怎样去重后发送到 kafka

2020-03-10 Thread wangl...@geekplus.com.cn
有两个 kafka 作为数据源的表 order_info: order_no info order_status: order_no status 两个表的 order_no 都会有重复,来一条其中一个表的记录,会在另外一个表中找到多条记录。 我怎样实现在另外一个表中只取出与该 join key 相关的最新的一条记录并发送到 kafka 中呢? kafka 只支持 append 模式的 sink,先把 表 group 再join 行不通。 谢谢, 王磊

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread Arvid Heise
Hi Kant, I just saw that asked the same question on SO [1]. Could you, in the future, please cross-reference these posts, so that we don't waste resources on answering? [1]

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread Arvid Heise
Hi Kant, according to the documentation [1], you don't need to set a watermark assigner: > Compared to *event time*, *ingestion time* programs cannot handle any > out-of-order events or late data, but the programs don’t have to specify > how to generate *watermarks*. > > Internally, *ingestion

Re: Flink Serialization as stable (kafka) output format?

2020-03-10 Thread Arvid Heise
Hi Theo, I strongly discourage the use of flink serialization for persistent storage of data. It was never intended to work in this way and does not offer the benefits of Avro of lazy schema evolution and maturity. Unless you can explicitly measure that Avro is a bottleneck in your setup, stick

Re: How to change the flink web-ui jobServer?

2020-03-10 Thread Arvid Heise
Hi LakeShen, you can change the port with conf.setInteger(RestOptions.PORT, 8082); or if want to be on the safe side specify a range conf.setString(RestOptions.BIND_PORT, "8081-8099"); On Mon, Mar 9, 2020 at 10:47 AM LakeShen wrote: > Hi community, >now I am moving the flink job to

Re: How to change the flink web-ui jobServer?

2020-03-10 Thread Arvid Heise
Hi LakeShen, you can change the port with conf.setInteger(RestOptions.PORT, 8082); or if want to be on the safe side specify a range conf.setString(RestOptions.BIND_PORT, "8081-8099"); On Mon, Mar 9, 2020 at 10:47 AM LakeShen wrote: > Hi community, >now I am moving the flink job to

Re: Question on the SQL "GROUPING SETS" and "CUBE" syntax usability

2020-03-10 Thread Arvid Heise
Hi Weike, according to the linked documentation, the operations are ready but as you have mentioned only for SQL batch mode, which is not surprising as they don't have a well-behaved semantics on streams. See also Calcites explanations [1]. Could you maybe outline your use case and what you'd

Re: How to print the aggregated state everytime it is updated?

2020-03-10 Thread Arvid Heise
Hi Kant, if you only want to output every second, you probably want to use a ProcessFunction with timers [1]. Basically, this function holds the states and manages the updates to it. The updates should also be stored in a local/non-state variable *changes*. Whenever the timer triggers, you would

Re: How do I get the value of 99th latency inside an operator?

2020-03-10 Thread Arvid Heise
Hi Felipe, could you use the JMX metrics reporter and tap into the reported values? The proposed hacks are obviously unstable over time. On Fri, Mar 6, 2020 at 1:06 PM Aljoscha Krettek wrote: > Hi, > > I'm afraid you're correct, this is currently not exposed and you would > have to hack

Re: Flink 内存类型相关疑问

2020-03-10 Thread zhisheng
好的,清楚了,感谢 Xintong Song 于2020年3月10日周二 下午12:43写道: > Hi Zhisheng, > > 首先,Flink 官方的内存配置文档 [1] 中明确表示了,不推荐同时配置 > process.size、flink.size、task.heap.size + managed.size 三者中的任意两种或以上。 > > > Explicitly configuring both *total process memory* and *total Flink > > memory* is not recommended. It may lead to