这样写好复杂。弊端和性能方面具体就不清楚,但肯定是可比MapState弱一点的

写个简单的MapState demo吧,如下:
env
    .addSource(flinkKafkaConsumer)
    .process(new ProcessFunction<String, Object>() {
      private static final long serialVersionUID = -8357959184126038977L;

      private MapState<String, String> accumulateState;

      @Override
      public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, String> accumulateStateDescriptor =
            new MapStateDescriptor<>(
                "map_state",
                StringSerializer.INSTANCE,
                StringSerializer.INSTANCE);
        accumulateState = 
getRuntimeContext().getMapState(accumulateStateDescriptor);
      }

      @Override
      public void processElement(String value, Context ctx, Collector<Object> 
out)
          throws Exception {
          String key = null;
        if (accumulateState.contains(key)) {
          String  存在的订单号 = accumulateState.get(key);
          存在订单号 和 value 合并;
    out.collect(合并的订单);
        } else {
          accumulateState.put(key, value);
        }
      }
    })
;



发件人: 1900
发送时间: 2019年4月2日 20:59
收件人: paullin3280
主题: 回复: 方案询问

MapState 暂时还不知道怎么做,后面继续研究,我现在做了个版本


1.将收到的流分成两份流,一份初始状态的流,一份终态的流
2.watermark用订单的eventtime,采用滑动窗口进行流的切分
3.根据订单号进行合并,采用CoGroupFunction进行流的处理
4.在CoGroupFunction中合并两个流,流1跟流2进行过滤合并,同一个订单号最终只有一条数据,最终变成一个流


不知道现在这样写怎么样?有没有什么弊端?性能怎么样?会不会造成数据丢失什么的?


------------------ 原始邮件 ------------------
发件人: "paullin3280"<paullin3...@gmail.com>;
发送时间: 2019年4月2日(星期二) 下午2:10
收件人: "user-zh"<user-zh@flink.apache.org>;

主题: Re: 方案询问



Hi,

推荐可以维护两个 MapState 分别缓存尚未匹配的两种订单。一条订单数据进来首先查找另一种订单的 MapState,若找到则输出合并的数据并删除对应的 
entry,否则放入所属订单类型的 MapState。

Best,
Paul Lam

> 在 2019年4月2日,13:46,1900 <575209...@qq.com> 写道:
> 
> 现在有个需求,从kafka接收订单信息,每条订单信息有1-2条数据(一般第一条是订单初始状态数据,第二条是订单终态数据);时间间隔不等(一般5秒以内),
> 如何能将数据进行合并,最终合并成一条数据?
> 
> 
> 现在有一个考虑,根据订单号keyby分组后处理,这样的话是不是开启的窗口太多了?

回复