Hi
现在的实现是这样的,每条数据会在每个窗口中存一份
Best,
Congxian
张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道:
> Hi,all!
> 由于第一次咨询,我不确定上一份邮件大家是否收到。
> 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> 份?
>
>
> | |
> 张浩
> |
> |
> 13669299...@163.com
> |
> 签名由网易邮箱大师定制
sorry,group agg.
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7))??.
----
??:"Benchao
code:
val inpurtDS =
streamTableEnv.toAppendStream[BehaviorInfo](behaviorTable)inpurtDS.print()val
pattern = Pattern.begin[BehaviorInfo]("start")
.where(_.clickCount 7)val patternStream = CEP.pattern(inpurtDS, pattern)
val result: DataStream[BehaviorInfo] = patternStream.process(
new
我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
x <35907...@qq.com> 于2020年7月6日周一 上午11:15写道:
> 版本是1.10.1,最后sink的时候确实是一个window里面做count
>
Hi,all!
由于第一次咨询,我不确定上一份邮件大家是否收到。
想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
份?
| |
张浩
|
|
13669299...@163.com
|
签名由网易邮箱大师定制
你好,可以参考下ExecutionConfigOptions,OptimizerConfigOptions和GlobalConfiguration,里面有比较清楚地介绍
--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月6日(星期一) 12:16
收件人:user-zh
主 题:回复:【Flink的shuffle mode】
pipeline:直接走网络传输,不buffer所有数据
batch:buffer所有数据,结束后一起发送
流一定是pipeline
批可以是pipeline(更好的性能),也可以是batch(更好的容错和更简单的资源申请)
Best,
Jingsong
On Mon, Jul 6, 2020 at 12:16 PM 忝忝向仧 <153488...@qq.com> wrote:
>
>
那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送?
发自我的iPhone
-- 原始邮件 --
发件人: Jingsong Li
??sql?? select day,
count(id),
sum(v1) from
(
select
day ,
id ,
sum(v1) v1 from source
group by day,
id
)t
group by day
tConfig.setIdleStateRetentionTime(Time.minutes(1440),Time.minutes(1450))
Hi,
可以通过以下步骤还原车祸现场:
kafka topic: test_action
kafka message:
{"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
代码Problem2.java:
package com.flink;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
??1.10.1??sinkwindow??count
distinct??window??count
distinct??windowgroupDATE_FORMAT(rowtm,
'-MM-dd') sql??
val rt_totaluv_view : Table =
regular join确实是这样,所以量大的话可以用interval join 、temporal join
> 2020年7月5日 下午3:50,忝忝向仧 <153488...@qq.com> 写道:
>
> Hi,all:
>
> 我看源码里写到JoinedStreams:
> 也就是说join时候都是走内存计算的,那么如果某个stream的key值过多,会导致oom
> 那么有什么预防措施呢?
> 将key值多的一边进行打散?
>
>
> Right now, the join is being evaluated in memory so you need to
Hi,
现在就两种:pipeline和batch
batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。
理论上可以per transformation的来设置,see PartitionTransformation.
Best,
Jingsong
On Sun, Jul 5, 2020 at 10:48 PM 忝忝向仧 <153488...@qq.com> wrote:
> Hi,all:
>
>
> 看Flink源码时候,在应用中使用keyBy后,源码的transformations会有shuffle
Hi,
我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
org.apache.flink.table.api.TableExecution: Failed to execute sql
caused by : java.lang.IlleagalStateException: No ExecutorFactory found to
execute the application.
at
Hi,
我现在转换思路,就是在定义表的时候,把ARRYA看成STRING,
那么,现在的问题,就是查询出来,都是空。
基于上面的代码环境,新写了一个类
Problem2.java
package com.flink;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import
Hi,
当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。
https://issues.apache.org/jira/browse/FLINK-17855
Best,
Jark
On Mon, 6 Jul 2020 at 10:19, Jim Chen wrote:
> 大家好:
> 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
> 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> 那么在eval方法接收到的就是Row[],
>
大家好:
我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
那么在eval方法接收到的就是Row[],
问题出在,Row[]中的数据获取不到,里面的元素都是NULL
通过下面的步骤和代码可还原车祸场景:
kafka topic: test_action
kafka message:
{"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002",
Mark一下,我现在也有这样的需求
> 2020年7月4日 下午12:35,zhisheng 写道:
>
> 我猜你是想要将 table name 作为一个标签方便后期分组查询过滤?
>
> wangl...@geekplus.com.cn 于2020年7月3日周五 上午10:24写道:
>
>> public void invoke(ObjectNode node, Context context) throws Exception {
>>
>>String tableName =
Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
我之前提了个jira 描述了这个问题
https://issues.apache.org/jira/browse/FLINK-18196
修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
Hi,all:
??Flink??keyBy??transformationsshuffle
mode??shuffle modeUNDEFINED
??shuffle mode
.
Hi,all:
??JoinedStreams:
join??stream??key??oom
?
??key
Right now, the join is being evaluated in memory so you need to ensure that the
number
* of elements per key does not get too high.
生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
Best!
zhisheng
Congxian Qiu 于2020年7月4日周六 下午3:21写道:
> @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
>
> Best,
> Congxian
>
>
> zhisheng 于2020年7月4日周六 下午12:27写道:
>
> > 我们也有遇到过这个异常,但是不是很常见
> >
> > Congxian Qiu 于2020年7月3日周五 下午2:08写道:
>
22 matches
Mail list logo