是3min中内的state太大导致的,增大TM task heap 和切换RocksdbStatebackend都可以解决

yue ma <mayuefi...@gmail.com> 于2022年3月28日周一 11:07写道:

> 看上去主要是 state 在heap中太大导致的, 建议可以切换为  RocksdbStatebackend
>
> yj h <hyjcry...@gmail.com> 于2022年3月27日周日 17:07写道:
>
> > 请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次
> >
> > 配置相关:
> > 配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置
> >
> > 目前采取的排查步骤:
> > 1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> 很快就oom掉了
> > 2.采用了evictor
> >                .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> >                 .evictor(TimeEvictor.of(Time.seconds(0),true))
> >
> >
> 3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
> >
> > 2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大
> >
> > mat分析dump 内存占用较大的是
> > org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和
> > org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
> >
> > 使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助
> >
> > 代码:
> > //其中ShoppingRecords UserClickModel都是普通的bean对象
> > inputStream
> >                 .filter(data -> "pv".equals(data.getBehavior()))
> >                 .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate,
> > Long>>() {
> >                     @Override
> >                     public Tuple2<LocalDate, Long> getKey(ShoppingRecords
> > value) throws Exception {
> >                         Instant instant =
> > Instant.ofEpochMilli(value.getTs());
> >                         return Tuple2.of(
> >                                 LocalDateTime.ofInstant(instant,
> > ZoneId.of("Asia/Shanghai")).toLocalDate(),
> >                                 value.getItemId()
> >                         );
> >                     }
> >                 })
> >                 .window(TumblingEventTimeWindows.of(Time.days(1)))
> > //
> .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3)))
> > //                .evictor(TimeEvictor.of(Time.seconds(0),true))
> >
> >
> .trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3))))
> >                 .process(new ProcessWindowFunctionBitMap())
> > //                .addSink(new RedisSink<>(conf, new UvRedisSink()));
> >                 .addSink(new PrintSinkFunction());
> >
> >  public static class ProcessWindowFunctionBitMap
> >             extends ProcessWindowFunction<ShoppingRecords,
> UserClickModel,
> > Tuple2<LocalDate, Long>, TimeWindow> {
> >
> >         private transient ValueState<Integer> pvState;
> >         private transient ValueState<Roaring64NavigableMap> bitMapState;
> >
> >         @Override
> >         public void open(Configuration parameters) throws Exception {
> >             super.open(parameters);
> >             ValueStateDescriptor<Integer> pvStateDescriptor = new
> > ValueStateDescriptor<>("pv", Integer.class);
> >             ValueStateDescriptor<Roaring64NavigableMap>
> > bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
> >                     , TypeInformation.of(new
> > TypeHint<Roaring64NavigableMap>() {
> >             }));
> > //            BlockingQueue
> >             // 过期状态清除
> >             StateTtlConfig stateTtlConfig = StateTtlConfig
> >
> > .newBuilder(org.apache.flink.api.common.time.Time.days(1))
> >
> > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >
> > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >                     .build();
> >             // 开启ttl
> >             pvStateDescriptor.enableTimeToLive(stateTtlConfig);
> >             bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
> >
> >             pvState =
> this.getRuntimeContext().getState(pvStateDescriptor);
> >             bitMapState =
> > this.getRuntimeContext().getState(bitMapStateDescriptor);
> >
> >         }
> >
> >         @Override
> >         public void process(Tuple2<LocalDate, Long> key,
> >                             ProcessWindowFunction<ShoppingRecords,
> > UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context,
> >                             Iterable<ShoppingRecords> elements,
> > Collector<UserClickModel> out) throws Exception {
> >             // 当前状态的pv uv
> >             Integer pv = pvState.value();
> >             Roaring64NavigableMap bitMap = bitMapState.value();
> >             if (bitMap == null) {
> >                 bitMap = new Roaring64NavigableMap();
> >                 pv = 0;
> >             }
> >
> >             Iterator<ShoppingRecords> iterator = elements.iterator();
> >             while (iterator.hasNext()) {
> >                 pv = pv + 1;
> >                 long uid = iterator.next().getUser_id();
> >                 //如果userId可以转成long
> >                 bitMap.add(uid);
> >             }
> >
> >             // 更新pv
> >             pvState.update(pv);
> >
> >             UserClickModel UserClickModel = new UserClickModel();
> >
> >             UserClickModel.setDate(key.f0.toString());
> >             UserClickModel.setProduct(key.f1);
> >             UserClickModel.setPv(pv);
> >             UserClickModel.setUv(bitMap.getIntCardinality());
> >
> >             out.collect(UserClickModel);
> >         }
> >     }
> >
>

回复