业务有个场景:数据量超大,现有的机器资源无法及时处理,需要抽样,但其中有部分数据是必须要处理的,所以数据源分两类:
1)满足维表白名单条件,则直接传到下游;
2)需要采样后传到下游;
过滤后的数据进行业务ETL清洗生成基础数据,基础数据中的白名单数据直接落盘,且基础数据经窗口统计后落盘。
即整个流程如下,
1. sample = source
.connect(ruleMap1) //关联维表,若满足白名单则直接发送;反之进行采样后再发送
.process(new BroadcastProcessFunction<>(){
private int j = 0, K = 10000;
private Row[] array = new Row[K]; //待发送到下游的抽样数据
private AtomicInteger counter = new AtomicInteger(0);
private long start = System.currentTimeMillis();
List<String> omgidList = new ArrayList<>();
@Override
public void processBroadcastElement(List<String> currentList, Context ctx,
Collector<Row> out) throws Exception {
omgidList = currentList;
}
@Override
public void processElement(String input, ReadOnlyContext ctx,
Collector<Row> out) throws Exception {
Row row = new Row(2);
row.set(0,input);
if ( ....) { // 1)白名单逻辑:若该满足条件,则做green标记,直接发送至下游
row.setField(1, "green");
out.collect(row);
}
if (...) { // 2)抽样逻辑:随机抽取记录放入到array中
row.setField(1, "yellow");
array[i] = row;
return;
}
long diff = System.currentTimeMillis() - start;
if (diff / 1000 >= 60) { //
3)若达到了抽样窗口结束时间(1min),则发送抽样结果数组,并重置计数器、计时器与数组
for (int k = 0; k < array.length; k++) {
out.collect(array[k]);
}
counter.set(0);
start = System.currentTimeMillis();
array = new Row[K];
}
}
}
2. tmp = sample.connect(ruleMap2).process(ETL);
3. basic = tmp.filter(row -> row.getFiled(1) == "green").map();
//basic.addSink()
4. aggr = tmp.flatmap().keyBy(random).window(5min).aggregate(sum).map();
//aggr.addSink()
程序跑起来不久就会导致报错,log写的是IO缓冲池爆了,所以flatmap无法发送元素。这里1分钟抽样1W条,5分钟也就只有5W,物理节点3G内存,无checkPoint,找了许久找不到原因,是不是采样处理方式不对?烦请大牛们指点下~
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:600)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:558)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:538)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:726)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:704)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.real.function.transform.StatInfoFlatMapFunction.flatMap(StatInfoFlatMapFunction.java:52)
at
com.real.function.transform.StatInfoFlatMapFunction.flatMap(StatInfoFlatMapFunction.java:21)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:583)
... 21 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:245)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:168)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:132)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 36 more