Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-06 Thread Trystan
Thanks Congxian! To make sure I'm understanding correctly, if I retain 3 incremental checkpoints (say every minute), and I've just completed checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So anything older than ~3 minutes can safely be deleted? The state from checkpoint 5

回复:在已有 Hadoop 外搭建 standalone 模式 HA flink 集群

2020-05-06 Thread Andrew
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html ---原始邮件--- 发件人: "wangl...@geekplus.com.cn"

回复: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群

2020-05-06 Thread wangl...@geekplus.com.cn
看起来这个文档可以,我先试下: https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-05-07 12:23 收件人: user-zh 主题: 在已有 Hadoop 外搭建 standalone 模式 HA flink 集群 现在已经有了一个 Hadoop 集群。 我想在这个 集群外(不同的机器,网络互通)部署一个

Re: Broadcast stream causing GC overhead limit exceeded

2020-05-06 Thread Eleanore Jin
Hi Fabian, I just got confirmation from Apache Beam community, Beam will buffer the data until there is data from broadcast stream. Thanks! Eleanore On Tue, May 5, 2020 at 12:31 AM Fabian Hueske wrote: > Hi Eleanore, > > The "GC overhead limit exceeded" error shows that the JVM spends way too

Re: Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-06 Thread Congxian Qiu
Hi For the rate limit, could you please try entropy injection[1]. For checkpoint, Flink will handle the file lifecycle(it will delete the file if it will never be used in the future). The file in the checkpoint will be there if the corresponding checkpoint is still valid. [1]

Re: Flink pipeline;

2020-05-06 Thread Leonard Xu
Hi Aissa Looks like your requirements is to enrich a real stream data(from kafka) with dimension data(your case will like: {sensor_id, equipment_id, workshop_id, factory_id} ), you can achieve your purpose by Flink DataStream API or just use FLINK SQL. I think use pure SQL will be esaier if

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread Jingsong Li
Hi, 重新计算一遍当然是正确的。 一个方式是参考Hive[1], Agg buffer需要保存count,sum,variance. 另一个方式是考虑分离distinct和variance,你试过直接用flink内置函数吗?比如variance(distinct item)? 当内置函数为variance时,会做一些特殊的plan改写优化。 [1]

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread Zhefu PENG
Hi Benchao & Jingsong, 谢谢你们的回复。的确使用sliding time window也是需要实现merge的。 这里有个额外问题想问一下Jingsong,就是在我当前这个需求场景下,能否给一些提示,关于如果merge以后,怎么准确又高效地更新结果的方差值呢?我目前想到的是在add里面可以增量更新方差值,同时用set记录每个出现的数值,然后在merge的时候,将set进行合并,重新计算一遍。保证结果的准确。(没有想到以增量更新的方式准确更新merge后的结果)。可以些idea吗 Thanks, Zhefu On Wed, May 6, 2020 at 22:00

Using flink-connector-kafka-1.9.1 with flink-core-1.7.2

2020-05-06 Thread Nick Bendtner
Hi guys, I am using flink 1.7.2 version. I have to deserialize data from kafka into consumer records therefore I decided to update the flink-connector-kafka to 1.9.1 which provides support for consumer record. We use child first class loading. However it seems like I have compatibility issue as I

Re: Window processing in Stateful Functions

2020-05-06 Thread Oytun Tez
Oops – will follow the thread  -- [image: MotaWord] Oytun Tez M O T A W O R D | CTO & Co-Founder oy...@motaword.com On Wed, May 6, 2020 at 5:37 PM m@xi wrote: > Hello Tez, > > With all the respect, I doubt your answer is related the question. > >

Statefun 2.0 questions

2020-05-06 Thread Wouter Zorgdrager
Hi all, I've been using Flink for quite some time now and for a university project I'm planning to experiment with statefun. During the walkthrough I've run into some issues, I hope you can help me with. 1) Is it correct that the Docker image of statefun is not yet published? I couldn't find it

Re: Window processing in Stateful Functions

2020-05-06 Thread m@xi
Hello Tez, With all the respect, I doubt your answer is related the question. *Just to re-phase a bit*: Assuming we use SF for our application, how can we apply window logic when a function does some processing? *Is there a proper way?* @*Igal*: we would very much appreciate your answer. :)

Re: Rich Function Thread Safety

2020-05-06 Thread Joey Echeverria
I’ve seen a few mailing list posts (including this one) that say Flink guarantees there is no concurrent access to operator methods (e.g. flatMap, snapshotState, etc.) and thus synchronization isn’t needed when writing operators that support checkpointing. I was trying to find a place in the

Re: Window processing in Stateful Functions

2020-05-06 Thread Oytun Tez
I think this is also related to my question about CEP in Statefun. @Annemarie Burger , I was looking into using Siddhi library within the Function's context. -- [image: MotaWord] Oytun Tez M O T A W O R D | CTO & Co-Founder oy...@motaword.com On Wed,

Window processing in Stateful Functions

2020-05-06 Thread Annemarie Burger
Hi, I want to do windowed processing in each function when using Stateful Functions. Is this possible? Some pseudo code would be very helpful! Some more context: I'm having a stream of edges as input. I want to window these and save the graph representation (either as edge list, adjacency list,

Shared Checkpoint Cleanup and S3 Lifecycle Policy

2020-05-06 Thread Trystan
Hello! Recently we ran into an issue when checkpointing to S3. Because S3 ratelimits based on prefix, the /shared directory would get slammed and cause S3 throttling. There is no solution for this, because /job/checkpoint/:id/shared is all part of the prefix, and is limited to 3,500

Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-06 Thread Luis Fernando Amaral
Hello, I'm looking for a way to modify state inside an operator in Flink. I’m following State Processor API guide - https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#modifying-savepoints

Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

2020-05-06 Thread Aljoscha Krettek
No, I think that should be all right. On 06.05.20 16:57, Vishwas Siravara wrote: Thanks I figured that would be the case. I m using the flink tuple type in the map functions ,so there is no casting required now. Can you think of any downsides of using flink tuples in scala code, especially

回复:[基础定义] 有状态计算 & 无状态计算

2020-05-06 Thread dixingxing85
可以看下这个文章:https://ververica.cn/developers/state-management/ 还有对应的视频(1.7 状态管理与容错机制):https://ververica.cn/developers/flink-training-course1/ 希望能对你有帮助 原始邮件 发件人:i'mpossible605769...@qq.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年4月26日(周日) 16:17 主题:[基础定义] 有状态计算 无状态计算 Hi: nbsp; nbsp;

Re: Export user metrics with Flink Prometheus endpoint

2020-05-06 Thread Eleanore Jin
Hi Aljoscha, Thanks for the response, yes prometheus reporter is already enabled and I am able to get the flink metrics from prometheus. My question is more like, currently I am using opencensus library to collect application metrics,

Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

2020-05-06 Thread Aljoscha Krettek
Hi, Flink will not do any casting between types. You either need to output to correct (Scala) Tuple type from the deserialization schema or insert a step (say a map function) that converts between the two types. The Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in common

Re: MongoDB as a Sink;

2020-05-06 Thread Aljoscha Krettek
Hi, yes, that is correct. You need to implement a SinkFunction. For getting started you can take a look at the Elasticsearch connector because Elasticsearch and MongoDB are roughly similar in terms of how you work with them, i.e. they are both key-value stores. Best, Aljoscha On 06.05.20

Re: Export user metrics with Flink Prometheus endpoint

2020-05-06 Thread Aljoscha Krettek
Hi, that should be possible. Did you have a look at the documentation for setting up a Prometheus metrics reporter: https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter Best, Aljoscha On 06.05.20 03:02,

Re: Autoscaling vs backpressure

2020-05-06 Thread Aljoscha Krettek
I'd say the two can't be considered equivalent because the back pressure does not "reach" back into the source system. It only goes as far back as the Flink source. So if the outside system produces data to fast into the queue from which Flink is reading this input would keep piling up. Best,

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread Jingsong Li
Hi, 首先merge会不会导致结果不准确是完全看你的实现,这里你 的需求看起来并没有实现上的困难。 merge这个接口就是为了优化性能才提出的,就像Benchao说的那样,如果你实现了merge,引擎会给做一些pane的优化。 当然前提是merge是有效率提升的,具体根据你的实现和测试来判断。 Best, Jingsong Lee On Wed, May 6, 2020 at 9:22 PM Benchao Li wrote: > Hi, > > 如果你用的sliding window,应该是也需要实现merge的。因为sliding

Re: Flink 查询hive表 初始化 Savepoint

2020-05-06 Thread Jingsong Li
Hi, 后续有规划Savepoint支持BoundedStream(在DataStream上构建批执行) 目前作为work around,或许你可以考虑先用Blink sql写到文件里(parquet,orc),再从文件里用Dataset读出来? Best, Jingsong Lee On Wed, May 6, 2020 at 7:45 PM Benchao Li wrote: > Hi, > > Blink planner是不支持Table API跟DataSet API互转的。 > Blink planner是批流统一的架构,不是基于DataSet API实现的批。 >

Re: Flink pipeline;

2020-05-06 Thread hemant singh
You will have to enrich the data coming in for eg- { "equipment-id" : "1-234", "sensor-id" : "1-vcy", . } . Since you will most likely have a keyedstream based on equipment-id+sensor-id or equipment-id, you can have a control stream with data about equipment to workshop/factory mapping

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread Benchao Li
Hi, 如果你用的sliding window,应该是也需要实现merge的。因为sliding window本身做了一个pane的优化。 会按照sliding size和window size取最大公约数作为pane,然后在trigger window计算的时候,把属于这个 window的pane都merge起来。 Zhefu PENG 于2020年5月6日周三 下午9:05写道: > 非常感谢。那我是不是能理解为:我在这里用的是sliding time > window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性? > > On Wed, May

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread Zhefu PENG
非常感谢。那我是不是能理解为:我在这里用的是sliding time window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性? On Wed, May 6, 2020 at 20:49 1193216154 <1193216...@qq.com> wrote: > 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口 > > > > ---原始邮件--- > 发件人: "Zhefu PENG" 发送时间: 2020年5月6日(周三) 晚上8:34 > 收件人: "user-zh" 主题:

回复:flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread 1193216154
官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口 ---原始邮件--- 发件人: "Zhefu PENG"

flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread Zhefu PENG
Hi all, 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。

Re: Flink 查询hive表 初始化 Savepoint

2020-05-06 Thread Benchao Li
Hi, Blink planner是不支持Table API跟DataSet API互转的。 Blink planner是批流统一的架构,不是基于DataSet API实现的批。 王良 于2020年5月6日周三 下午4:41写道: > 我使用的是flink 1.10 ,想通过查询hive表的数据初始化Savepoint,现在遇到的问题是无法将Table转成DataSet > > EnvironmentSettings settings = >

flink-提交jar 隔断时间自己重启问题

2020-05-06 Thread 祁森伟
java.util.concurrent.CompletionException: java.lang.Exception: Exception from container-launch. Container id: container_e08_1585816990153_0011_01_22 Exit code: 239 Stack trace: ExitCodeException exitCode=239: at org.apache.hadoop.util.Shell.runCommand(Shell.java:972) at

Re: Re:Re:回复:flink1.9,state process api 读取MapState,出错

2020-05-06 Thread Congxian Qiu
你好,你最后一条消息的图挂了,可以尝试用文字把整个问题的原因,以及解决方案描述一下发到邮件列表中,可以帮助后来人。后续有人遇到类似的问题,能够搜索到相关的解决方案。 Best, Congxian guanyq 于2020年4月30日周四 下午4:13写道: > 定位到问题点了。 > 和这个keyBy有关,后一种可以读取MapState,前一种报错 > > > >

Re: checkpointing opening too many file

2020-05-06 Thread Congxian Qiu
Hi Yes, for your use case, if you do not have large state size, you can try to use FsStateBackend. Best, Congxian ysnakie 于2020年4月27日周一 下午3:42写道: > Hi > If I use FsStateBackend instead of RocksdbFsStateBackend, will the open > files decrease significantly? I dont have large state size. > >

Re: Flink: For terabytes of keyed state.

2020-05-06 Thread Congxian Qiu
Hi Gowri Please let us know if you meet any problem~ Best, Congxian Gowri Sundaram 于2020年5月6日周三 下午1:53写道: > Hi Congxian, > Thank you so much for your response! We will go ahead and do a POC to test > out how Flink performs at scale. > > Regards, > - Gowri > > On Wed, May 6, 2020 at 8:34 AM

Re: flink 1.10内存设置

2020-05-06 Thread Xintong Song
内存申请是在 MemoryUtils#allocateUnsafe 。这里面最重要的是要通过 Unsafe 的 private static field 拿到 Unsafe 对象,这部分逻辑是在 MemoryUtils#getUnsafe 。 Thank you~ Xintong Song On Wed, May 6, 2020 at 5:41 PM 蒋佳成(Jiacheng Jiang) <920334...@qq.com> wrote: > hi Xintong,你能够告诉我flink中申请managed memory相关代码是在哪个类中吗?我想看看flink中native

?????? flink 1.10????????

2020-05-06 Thread ??????(Jiacheng Jiang)
hi Xintong??flink??managed memoryflink??native memory ---- ??: "Xintong Song"http://apache-flink.147419.n8.nabble.com/Flink-tt1869.html

Re: What is the RocksDB local directory in flink checkpointing?

2020-05-06 Thread Till Rohrmann
Hi LakeShen, `state.backend.rocksdb.localdir` defines the directory in which RocksDB will store its local files. Local files are RocksDB's SST and metadata files for example. This directory does not need to be persisted. If the config option is not configured, then it will use the nodes temporary

Flink ????hive?? ?????? Savepoint

2020-05-06 Thread ??????
??flink 1.10 hive??SavepointTableDataSet EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); Table table =

Flink 查询hive表 初始化 Savepoint

2020-05-06 Thread 王良
我使用的是flink 1.10 ,想通过查询hive表的数据初始化Savepoint,现在遇到的问题是无法将Table转成DataSet EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); Table table = tableEnv.sqlQuery("select * from test001");

Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-05-06 Thread Yang Wang
Hi aj, >From the logs you have provided, the hadoop version is still 2.4.1. Could you check the user jar(i.e. events-processor-1.0-SNAPSHOT.jar) have some hadoop classes? If it is, you need to exclude the hadoop dependency. Best, Yang aj 于2020年5月6日周三 下午3:38写道: > Hello, > > Please help me

Re: MongoDB sink;

2020-05-06 Thread Jingsong Li
Hi, My impression is that MongoDB's API is not complicated. So you can implement a MongoDB sink. Something like: @Override public void invoke(Row value, Context context) throws Exception { Map map = new HashMap<>(); for (int i = 0; i < fieldNames.length; i++) {

Re: upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-05-06 Thread aj
Hello, Please help me upgrade to 1.10 in AWS EMR. On Fri, May 1, 2020 at 4:05 PM aj wrote: > Hi Yang, > > I am attaching the logs for your reference, please help me what i am doing > wrong. > > Thanks, > Anuj > > On Wed, Apr 29, 2020 at 9:06 AM Yang Wang wrote: > >> Hi Anuj, >> >> I think the

Re: multiple joins in one job

2020-05-06 Thread Fabian Hueske
You can in fact forward both time attributes because Flink makes sure that the watermark is automatically adjusted to the "slower" of both input streams. You can run the following queries in the SQL CLI client (here taken an example from a Flink SQL training [1] Flink SQL> CREATE VIEW

??????MongoDB sink;

2020-05-06 Thread myflink
my solution: First, Flink sinks data to Kafka; Second, MongoDB reads data from Kafka. Over. ---- ??:"Aissa Elaffani"

MongoDB sink;

2020-05-06 Thread Aissa Elaffani
Hello , I want to sink my data to MongoDB but as far as I know there is no sink connector to MongoDB. How can I implement a MongoDB sink ? If there is any other solutions, I hope you can share with me.