这样写好复杂。弊端和性能方面具体就不清楚,但肯定是可比MapState弱一点的
写个简单的MapState demo吧,如下:
env
.addSource(flinkKafkaConsumer)
.process(new ProcessFunction<String, Object>() {
private static final long serialVersionUID = -8357959184126038977L;
private MapState<String, String> accumulateState;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, String> accumulateStateDescriptor =
new MapStateDescriptor<>(
"map_state",
StringSerializer.INSTANCE,
StringSerializer.INSTANCE);
accumulateState =
getRuntimeContext().getMapState(accumulateStateDescriptor);
}
@Override
public void processElement(String value, Context ctx, Collector<Object>
out)
throws Exception {
String key = null;
if (accumulateState.contains(key)) {
String 存在的订单号 = accumulateState.get(key);
存在订单号 和 value 合并;
out.collect(合并的订单);
} else {
accumulateState.put(key, value);
}
}
})
;
发件人: 1900
发送时间: 2019年4月2日 20:59
收件人: paullin3280
主题: 回复: 方案询问
MapState 暂时还不知道怎么做,后面继续研究,我现在做了个版本
1.将收到的流分成两份流,一份初始状态的流,一份终态的流
2.watermark用订单的eventtime,采用滑动窗口进行流的切分
3.根据订单号进行合并,采用CoGroupFunction进行流的处理
4.在CoGroupFunction中合并两个流,流1跟流2进行过滤合并,同一个订单号最终只有一条数据,最终变成一个流
不知道现在这样写怎么样?有没有什么弊端?性能怎么样?会不会造成数据丢失什么的?
------------------ 原始邮件 ------------------
发件人: "paullin3280"<[email protected]>;
发送时间: 2019年4月2日(星期二) 下午2:10
收件人: "user-zh"<[email protected]>;
主题: Re: 方案询问
Hi,
推荐可以维护两个 MapState 分别缓存尚未匹配的两种订单。一条订单数据进来首先查找另一种订单的 MapState,若找到则输出合并的数据并删除对应的
entry,否则放入所属订单类型的 MapState。
Best,
Paul Lam
> 在 2019年4月2日,13:46,1900 <[email protected]> 写道:
>
> 现在有个需求,从kafka接收订单信息,每条订单信息有1-2条数据(一般第一条是订单初始状态数据,第二条是订单终态数据);时间间隔不等(一般5秒以内),
> 如何能将数据进行合并,最终合并成一条数据?
>
>
> 现在有一个考虑,根据订单号keyby分组后处理,这样的话是不是开启的窗口太多了?