Re: Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 Jark Wu
可能是因为你在 schema 中把 eventTime 声明成了 timestamp 类型,你可以声明成 long 试试。 
.field("_rowtime", Types.LONG())

> 在 2019年9月5日,15:11,hb <343122...@163.com> 写道:
> 
> 实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.



?????? Re: Re: flink1.9??blinkSQL??????udf??TIMESTAMP????????

2019-09-05 文章 ????
override getResultType


public TypeInformation

答复: Re: 如何优化flink内存?

2019-09-05 文章 戴嘉诚
对,我这边使用的也是相同的操作

发件人: 陈赋赟
发送时间: 2019年9月5日 16:08
收件人: user-zh@flink.apache.org
主题: Re:Re: 如何优化flink内存?

HI
  我在项目中有遇到过类似的情况,我说下我的想法和思路。
  伊始是需要统计90天事件窗口中用户浏览事件总数,如果是在近30天内有浏览事件则累加1次,在30天内没有浏览事件但在 30天 ~ 
90天内有其他浏览事件则记0次(需求比较奇葩),我们使用了滑动窗口(长度90天 步长1天 
数据进来实时trigger触发计算)因为需要拿到窗口的结束时间所以一开始是用windowProcessFunction去做的聚合统计,这意味着90个窗口每个窗口里都需要缓存着全部的数据而不是一个聚合汇总数据,在线上跑了两天后发现checkpoint
 size已经陡增到20个G并且不久就OOM了。后面想了一下,Flink 
提供的SlideWindow的算法不是闭包可以直接复用,用flatmap对每条数据使用slideWindow得出这条数据对应的90天的窗口结束时间,然后在keyby后使用ProcessFunction,在里面自定义valueState对数据进行聚合汇总,并且在processFunction内部还可以访问TimeService,可以注册清理过期state数据的Timer,并在onTimer回调方法中清理状态。
 以上是我的思路,希望能帮助到你~




祝好





在 2019-09-05 13:43:00,"Yifei Qi"  写道:
>你的意思是自己去实现滑动窗口的功能么?
>
>戴嘉诚  于2019年9月4日周三 下午10:51写道:
>
>> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>>
>> Yifei Qi 于2019年9月4日 周三20:07写道:
>>
>> > 大家好:
>> >
>> >
>> >
>> > 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>> >
>> >
>> >
>> > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>> >
>> >
>> >
>> > 具体情况是这样的:
>> >
>> > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>> >
>> > 按照用户进行分组.
>> >
>> > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>> >
>> >
>> >
>> >
>> >
>> > flink运行在3个节点后, 内存合计就用了5G.
>> >
>> >
>> >
>> >
>> >
>> > flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>> >
>> >
>> >
>> >
>> >
>> > 顺祝商祺
>> >
>> >
>> > --
>> >
>> >
>> > Qi Yifei
>> > [image: https://]about.me/qyf404
>> > <
>> >
>> https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api
>> > >
>> >
>>
>
>
>-- 
>
>
>Qi Yifei
>[image: https://]about.me/qyf404
>



如何统计数据处理延迟Delay情况

2019-09-05 文章 陈赋赟
HI ALL
  目前想对Flink Job添加一个统计数据处理延迟情况的Metric,目前的想法是拿到数据携带的时间(Event 
Time)于当前节点的时间(System.getCurrentTime)相减,得出的值即数据延迟处理的时间,但不确定这个想法是否正确且可行,求各位大佬提供思路和想法~

Re:Re: 如何优化flink内存?

2019-09-05 文章 陈赋赟
HI
  我在项目中有遇到过类似的情况,我说下我的想法和思路。
  伊始是需要统计90天事件窗口中用户浏览事件总数,如果是在近30天内有浏览事件则累加1次,在30天内没有浏览事件但在 30天 ~ 
90天内有其他浏览事件则记0次(需求比较奇葩),我们使用了滑动窗口(长度90天 步长1天 
数据进来实时trigger触发计算)因为需要拿到窗口的结束时间所以一开始是用windowProcessFunction去做的聚合统计,这意味着90个窗口每个窗口里都需要缓存着全部的数据而不是一个聚合汇总数据,在线上跑了两天后发现checkpoint
 size已经陡增到20个G并且不久就OOM了。后面想了一下,Flink 
提供的SlideWindow的算法不是闭包可以直接复用,用flatmap对每条数据使用slideWindow得出这条数据对应的90天的窗口结束时间,然后在keyby后使用ProcessFunction,在里面自定义valueState对数据进行聚合汇总,并且在processFunction内部还可以访问TimeService,可以注册清理过期state数据的Timer,并在onTimer回调方法中清理状态。
 以上是我的思路,希望能帮助到你~




祝好





在 2019-09-05 13:43:00,"Yifei Qi"  写道:
>你的意思是自己去实现滑动窗口的功能么?
>
>戴嘉诚  于2019年9月4日周三 下午10:51写道:
>
>> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>>
>> Yifei Qi 于2019年9月4日 周三20:07写道:
>>
>> > 大家好:
>> >
>> >
>> >
>> > 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>> >
>> >
>> >
>> > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>> >
>> >
>> >
>> > 具体情况是这样的:
>> >
>> > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>> >
>> > 按照用户进行分组.
>> >
>> > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>> >
>> >
>> >
>> >
>> >
>> > flink运行在3个节点后, 内存合计就用了5G.
>> >
>> >
>> >
>> >
>> >
>> > flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>> >
>> >
>> >
>> >
>> >
>> > 顺祝商祺
>> >
>> >
>> > --
>> >
>> >
>> > Qi Yifei
>> > [image: https://]about.me/qyf404
>> > <
>> >
>> https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api
>> > >
>> >
>>
>
>
>-- 
>
>
>Qi Yifei
>[image: https://]about.me/qyf404
>


?????? Re: Re: flink1.9??blinkSQL??????udf??TIMESTAMP????????

2019-09-05 文章 ????





----
??:"user-zh@flink.apache.org Jingso"

Re:回复: Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 hb
实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.




在 2019-09-05 14:06:08,"pengcheng...@bonc.com.cn"  写道:
>FLINK 应该不能把输入的eventTime的long类型转成SQL_TIMESTAMP类型
>
> 
>发件人: hb
>发送时间: 2019-09-05 14:24
>收件人: user-zh
>主题: Flink 1.9 Blink planner 时间字段问题
>代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
>schema
>.field("_rowtime", Types.SQL_TIMESTAMP())
>.rowtime(
>new Rowtime()
>.timestampsFromField("eventTime")
>.watermarksPeriodicBounded(1000))
>kafka输入:  {"eventTime": 10, "id":1,"name":"hb"}  会报错,
> 
>输入  {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确,
>eventTime 字段怎么不支持数值输入呢.
> 
> 
>错误提示:
>```
>Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize 
>JSON object.
>at 
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>at 
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>at java.lang.Thread.run(Thread.java:748)
>Caused by: java.io.IOException: Failed to deserialize JSON object.
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
>at 
>org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>at 
>org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
>at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>at 
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>at 
>org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>at 
>org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>Caused by: java.time.format.DateTimeParseException: Text '10' could not be 
>parsed at index 0
>at 
>java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
>at 
>org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
>... 7 more
>```
> 
> 
> 
> 
>源码:
>```
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val conf = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv = StreamTableEnvironment.create(env, conf)
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
>  val kafkaIn = new Kafka()
>.version("0.11")
>.topic("hbtest111")
>.property("bootstrap.servers", "192.168.1.160:19092")
>.property("group.id", "test2")
> 
> 
>  val json = new Json().deriveSchema()
> 
> 
>  val schema = new Schema()
>.field("id", Types.INT())
>.field("name", Types.STRING())
> 
> 
>  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
>  schema
>.field("_rowtime", Types.SQL_TIMESTAMP())
>.rowtime(
>  new Rowtime()
>.timestampsFromField("eventTime")
>.watermarksPeriodicBounded(1000)
>)
> 
> 
>  
> tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
>  val t = tEnv.sqlQuery("select * from table_from_kafka")
>  t.printSchema()
> 
> 
>  t.toRetractStream[Row].print()
>  tEnv.execute("")
>```


Re: Re: flink1.9.0对DDL的支持

2019-09-05 文章 pengcheng...@bonc.com.cn
谢谢你的回答,Wesley Peng.只能在CLI里Create view 还是太不灵活了,期待1.10.


 
发件人: Wesley Peng
发送时间: 2019-09-05 11:52
收件人: user-zh
主题: Re: flink1.9.0对DDL的支持
Hi
 
on 2019/9/5 11:23, pengcheng...@bonc.com.cn wrote:
> 请教一下, 1.flink1.9.0的table API/sql是不是还没有支持Create view?
 
from the official documentation of flink 1.9:
 
Views can also be created within a CLI session using the CREATE VIEW 
statement:
 
CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;
 
Views created within a CLI session can also be removed again using the 
DROP VIEW statement:
 
DROP VIEW MyNewView;
 
Attention The definition of views in the CLI is limited to the mentioned 
syntax above. Defining a schema for views or escaping whitespaces in 
table names will be supported in future versions.
 
So create view is supported but has the limits.
 
regards.


Flink 1.9 Blink planner 时间字段问题

2019-09-05 文章 hb
代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000))
kafka输入:  {"eventTime": 10, "id":1,"name":"hb"}  会报错,

输入  {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确,
eventTime 字段怎么不支持数值输入呢.


错误提示:
```
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON 
object.
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize JSON object.
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.time.format.DateTimeParseException: Text '10' could not be 
parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more
```




源码:
```
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val conf = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv = StreamTableEnvironment.create(env, conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  val kafkaIn = new Kafka()
.version("0.11")
.topic("hbtest111")
.property("bootstrap.servers", "192.168.1.160:19092")
.property("group.id", "test2")


  val json = new Json().deriveSchema()


  val schema = new Schema()
.field("id", Types.INT())
.field("name", Types.STRING())


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
  schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
  new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)


  
tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
  val t = tEnv.sqlQuery("select * from table_from_kafka")
  t.printSchema()


  t.toRetractStream[Row].print()
  tEnv.execute("")
```

Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

2019-09-05 文章 JingsongLee
override getResultType方法,返回Types.SQL_TIMESTAMP.
这样应该可以绕过。
1.10会修复这个问题。

Best,
Jingsong Lee


--
From:守护 <346531...@qq.com>
Send Time:2019年9月5日(星期四) 12:11
To:user-zh@flink.apache.org JingsongLee ; user-zh 

Subject:回复: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

在哪声明DataType,这个要引入什么包吗,求指点,我的udf代码如下:

import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;


public class UTC2Local extends ScalarFunction {
public Timestamp eval(Timestamp s) {
long timestamp = s.getTime() + 2880;
return new Timestamp(timestamp);
}

}



-- 原始邮件 --
发件人: "JingsongLee";
发送时间: 2019年9月5日(星期四) 中午11:55
收件人: "user-zh";
主题:  Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


--
From:守护 <346531...@qq.com>
Send Time:2019年9月5日(星期四) 11:48
To:user-zh 
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not support dataType: TIMESTAMP(9)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
at 
org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
at 
org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)

答复: 如何优化flink内存?

2019-09-05 文章 戴嘉诚
对,你可以自己再state中维持一整天的数据,让后根据时间戳来删除过期数据来替换滑动窗口


发件人: Yifei Qi
发送时间: 2019年9月5日 13:42
收件人: user-zh@flink.apache.org
主题: Re: 如何优化flink内存?

你的意思是自己去实现滑动窗口的功能么?

戴嘉诚  于2019年9月4日周三 下午10:51写道:

> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>
> Yifei Qi 于2019年9月4日 周三20:07写道:
>
> > 大家好:
> >
> >
> >
> > 不知道大家在使用flink时遇到过内存消耗过大的问题么?
> >
> >
> >
> > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
> >
> >
> >
> > 具体情况是这样的:
> >
> > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
> >
> > 按照用户进行分组.
> >
> > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
> >
> >
> >
> >
> >
> > flink运行在3个节点后, 内存合计就用了5G.
> >
> >
> >
> >
> >
> > flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
> >
> >
> >
> >
> >
> > 顺祝商祺
> >
> >
> > --
> >
> >
> > Qi Yifei
> > [image: https://]about.me/qyf404
> > <
> >
> https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api
> > >
> >
>


-- 


Qi Yifei
[image: https://]about.me/qyf404