这样写好复杂。弊端和性能方面具体就不清楚,但肯定是可比MapState弱一点的
写个简单的MapState demo吧,如下: env .addSource(flinkKafkaConsumer) .process(new ProcessFunction<String, Object>() { private static final long serialVersionUID = -8357959184126038977L; private MapState<String, String> accumulateState; @Override public void open(Configuration parameters) throws Exception { MapStateDescriptor<String, String> accumulateStateDescriptor = new MapStateDescriptor<>( "map_state", StringSerializer.INSTANCE, StringSerializer.INSTANCE); accumulateState = getRuntimeContext().getMapState(accumulateStateDescriptor); } @Override public void processElement(String value, Context ctx, Collector<Object> out) throws Exception { String key = null; if (accumulateState.contains(key)) { String 存在的订单号 = accumulateState.get(key); 存在订单号 和 value 合并; out.collect(合并的订单); } else { accumulateState.put(key, value); } } }) ; 发件人: 1900 发送时间: 2019年4月2日 20:59 收件人: paullin3280 主题: 回复: 方案询问 MapState 暂时还不知道怎么做,后面继续研究,我现在做了个版本 1.将收到的流分成两份流,一份初始状态的流,一份终态的流 2.watermark用订单的eventtime,采用滑动窗口进行流的切分 3.根据订单号进行合并,采用CoGroupFunction进行流的处理 4.在CoGroupFunction中合并两个流,流1跟流2进行过滤合并,同一个订单号最终只有一条数据,最终变成一个流 不知道现在这样写怎么样?有没有什么弊端?性能怎么样?会不会造成数据丢失什么的? ------------------ 原始邮件 ------------------ 发件人: "paullin3280"<paullin3...@gmail.com>; 发送时间: 2019年4月2日(星期二) 下午2:10 收件人: "user-zh"<user-zh@flink.apache.org>; 主题: Re: 方案询问 Hi, 推荐可以维护两个 MapState 分别缓存尚未匹配的两种订单。一条订单数据进来首先查找另一种订单的 MapState,若找到则输出合并的数据并删除对应的 entry,否则放入所属订单类型的 MapState。 Best, Paul Lam > 在 2019年4月2日,13:46,1900 <575209...@qq.com> 写道: > > 现在有个需求,从kafka接收订单信息,每条订单信息有1-2条数据(一般第一条是订单初始状态数据,第二条是订单终态数据);时间间隔不等(一般5秒以内), > 如何能将数据进行合并,最终合并成一条数据? > > > 现在有一个考虑,根据订单号keyby分组后处理,这样的话是不是开启的窗口太多了?