是3min中内的state太大导致的,增大TM task heap 和切换RocksdbStatebackend都可以解决
yue ma <mayuefi...@gmail.com> 于2022年3月28日周一 11:07写道: > 看上去主要是 state 在heap中太大导致的, 建议可以切换为 RocksdbStatebackend > > yj h <hyjcry...@gmail.com> 于2022年3月27日周日 17:07写道: > > > 请教一个taskmanager oom的问题,我在计算一天的uv,采用ContinuousEventTimeTrigger 来3分钟触发一次 > > > > 配置相关: > > 配置是2个机器,每个2核,slots设置的也是每个2,并行度是4,其他jobmanager和taskmanager的内存是默认配置 > > > > 目前采取的排查步骤: > > 1.最开始只调用了.trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) > 很快就oom掉了 > > 2.采用了evictor > > .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) > > .evictor(TimeEvictor.of(Time.seconds(0),true)) > > > > > 3.采用了了.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3)))) > > > > 2,3的方式同样会oom,jstat 可以看到一只在发生fullgc, checkpoint的大小一直在增大 大概到300m最大 > > > > mat分析dump 内存占用较大的是 > > org.apache.flink.runtime.state.heap.CopyOnWriteStateMap$StateMapEntry和 > > org.apache.flink.streaming.api.operators.TimerHeapInternalTimer > > > > 使用了PurgingTrigger 仍旧OOM,该问题应该如何排查呢,希望得到一些帮助 > > > > 代码: > > //其中ShoppingRecords UserClickModel都是普通的bean对象 > > inputStream > > .filter(data -> "pv".equals(data.getBehavior())) > > .keyBy(new KeySelector<ShoppingRecords, Tuple2<LocalDate, > > Long>>() { > > @Override > > public Tuple2<LocalDate, Long> getKey(ShoppingRecords > > value) throws Exception { > > Instant instant = > > Instant.ofEpochMilli(value.getTs()); > > return Tuple2.of( > > LocalDateTime.ofInstant(instant, > > ZoneId.of("Asia/Shanghai")).toLocalDate(), > > value.getItemId() > > ); > > } > > }) > > .window(TumblingEventTimeWindows.of(Time.days(1))) > > // > .trigger(ContinuousEventTimeTrigger.of(Time.minutes(3))) > > // .evictor(TimeEvictor.of(Time.seconds(0),true)) > > > > > .trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.minutes(3)))) > > .process(new ProcessWindowFunctionBitMap()) > > // .addSink(new RedisSink<>(conf, new UvRedisSink())); > > .addSink(new PrintSinkFunction()); > > > > public static class ProcessWindowFunctionBitMap > > extends ProcessWindowFunction<ShoppingRecords, > UserClickModel, > > Tuple2<LocalDate, Long>, TimeWindow> { > > > > private transient ValueState<Integer> pvState; > > private transient ValueState<Roaring64NavigableMap> bitMapState; > > > > @Override > > public void open(Configuration parameters) throws Exception { > > super.open(parameters); > > ValueStateDescriptor<Integer> pvStateDescriptor = new > > ValueStateDescriptor<>("pv", Integer.class); > > ValueStateDescriptor<Roaring64NavigableMap> > > bitMapStateDescriptor = new ValueStateDescriptor("bitMap" > > , TypeInformation.of(new > > TypeHint<Roaring64NavigableMap>() { > > })); > > // BlockingQueue > > // 过期状态清除 > > StateTtlConfig stateTtlConfig = StateTtlConfig > > > > .newBuilder(org.apache.flink.api.common.time.Time.days(1)) > > > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > .build(); > > // 开启ttl > > pvStateDescriptor.enableTimeToLive(stateTtlConfig); > > bitMapStateDescriptor.enableTimeToLive(stateTtlConfig); > > > > pvState = > this.getRuntimeContext().getState(pvStateDescriptor); > > bitMapState = > > this.getRuntimeContext().getState(bitMapStateDescriptor); > > > > } > > > > @Override > > public void process(Tuple2<LocalDate, Long> key, > > ProcessWindowFunction<ShoppingRecords, > > UserClickModel, Tuple2<LocalDate, Long>, TimeWindow>.Context context, > > Iterable<ShoppingRecords> elements, > > Collector<UserClickModel> out) throws Exception { > > // 当前状态的pv uv > > Integer pv = pvState.value(); > > Roaring64NavigableMap bitMap = bitMapState.value(); > > if (bitMap == null) { > > bitMap = new Roaring64NavigableMap(); > > pv = 0; > > } > > > > Iterator<ShoppingRecords> iterator = elements.iterator(); > > while (iterator.hasNext()) { > > pv = pv + 1; > > long uid = iterator.next().getUser_id(); > > //如果userId可以转成long > > bitMap.add(uid); > > } > > > > // 更新pv > > pvState.update(pv); > > > > UserClickModel UserClickModel = new UserClickModel(); > > > > UserClickModel.setDate(key.f0.toString()); > > UserClickModel.setProduct(key.f1); > > UserClickModel.setPv(pv); > > UserClickModel.setUv(bitMap.getIntCardinality()); > > > > out.collect(UserClickModel); > > } > > } > > >