看上去主要是 state 在heap中太大导致的, 建议可以切换为 RocksdbStatebackend
yj h <hyjcry...@gmail.com> 于2022年3月27日周日 17:07写道: > 请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次 > > 配置相关: > 配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置 > > 目前采取的排查步骤: > 1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) 很快就oom掉了 > 2.采用了evictor > .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) > .evictor(TimeEvictor.of(Time.seconds(0),true)) > > 3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3)))) > > 2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大 > > mat分析dump 内存占用较大的是 > org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和 > org.apache.flink.streaming.api.operators.TimerHeapInternalTimer > > 使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助 > > 代码: > //其中ShoppingRecords UserClickModel都是普通的bean对象 > inputStream > .filter(data -> "pv".equals(data.getBehavior())) > .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate, > Long>>() { > @Override > public Tuple2<LocalDate, Long> getKey(ShoppingRecords > value) throws Exception { > Instant instant = > Instant.ofEpochMilli(value.getTs()); > return Tuple2.of( > LocalDateTime.ofInstant(instant, > ZoneId.of("Asia/Shanghai")).toLocalDate(), > value.getItemId() > ); > } > }) > .window(TumblingEventTimeWindows.of(Time.days(1))) > // .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) > // .evictor(TimeEvictor.of(Time.seconds(0),true)) > > .trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3)))) > .process(new ProcessWindowFunctionBitMap()) > // .addSink(new RedisSink<>(conf, new UvRedisSink())); > .addSink(new PrintSinkFunction()); > > public static class ProcessWindowFunctionBitMap > extends ProcessWindowFunction<ShoppingRecords, UserClickModel, > Tuple2<LocalDate, Long>, TimeWindow> { > > private transient ValueState<Integer> pvState; > private transient ValueState<Roaring64NavigableMap> bitMapState; > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > ValueStateDescriptor<Integer> pvStateDescriptor = new > ValueStateDescriptor<>("pv", Integer.class); > ValueStateDescriptor<Roaring64NavigableMap> > bitMapStateDescriptor = new ValueStateDescriptor("bitMap" > , TypeInformation.of(new > TypeHint<Roaring64NavigableMap>() { > })); > // BlockingQueue > // 过期状态清除 > StateTtlConfig stateTtlConfig = StateTtlConfig > > .newBuilder(org.apache.flink.api.common.time.Time.days(1)) > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > // 开启ttl > pvStateDescriptor.enableTimeToLive(stateTtlConfig); > bitMapStateDescriptor.enableTimeToLive(stateTtlConfig); > > pvState = this.getRuntimeContext().getState(pvStateDescriptor); > bitMapState = > this.getRuntimeContext().getState(bitMapStateDescriptor); > > } > > @Override > public void process(Tuple2<LocalDate, Long> key, > ProcessWindowFunction<ShoppingRecords, > UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context, > Iterable<ShoppingRecords> elements, > Collector<UserClickModel> out) throws Exception { > // 当前状态的pv uv > Integer pv = pvState.value(); > Roaring64NavigableMap bitMap = bitMapState.value(); > if (bitMap == null) { > bitMap = new Roaring64NavigableMap(); > pv = 0; > } > > Iterator<ShoppingRecords> iterator = elements.iterator(); > while (iterator.hasNext()) { > pv = pv + 1; > long uid = iterator.next().getUser_id(); > //如果userId可以转成long > bitMap.add(uid); > } > > // 更新pv > pvState.update(pv); > > UserClickModel UserClickModel = new UserClickModel(); > > UserClickModel.setDate(key.f0.toString()); > UserClickModel.setProduct(key.f1); > UserClickModel.setPv(pv); > UserClickModel.setUv(bitMap.getIntCardinality()); > > out.collect(UserClickModel); > } > } >