各位老师好:
执行下面代码导致所有节点磁盘占满,在本地调试时C盘也沾满了
文件名称:flink-netty-shuffle-b71f58a6-0fdb-437f-a6b8-6b0bbedd3dfa
说明:
1. 批处理模式
2.本地测试时输入目录oneDay和long大小在1G左右,启动程序后会把C(C:\Users\xxx
\AppData\Local\Temp)盘剩余的几十G空间占满,部署到集群后,也会逐渐占满各节点磁盘
3.广播流blackListStream大概一万条记录,尝试把process中获取广播变量的代码和processBroadcastElement方法注释了,仍不起作用
String oneDayLogFile = "C:\\Users\\xianghuibai\\Desktop\\oneDay";
String historyFileName = "C:\\Users\\xianghuibai\\Desktop\\long";
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<String> blackListStream =
env.fromCollection(RedisPool20484Utils.getCustomJedisCluster().smembers("user_blacklist_cid_test"));
MapStateDescriptor<String, Boolean> type =
new MapStateDescriptor<String, Boolean>("blackList_type",
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
BroadcastStream<String> blackList_b = blackListStream.broadcast(type);
DataStream<Tuple5<String, String, String, String, String>> oneDayLog =
env.readTextFile(oneDayLogFile)
.map(new MapFunction<String, Tuple5<String, String, String,
String, String>>() {
@Override
public Tuple5<String, String, String, String, String>
map(String line) throws Exception {
String[] arrs = line.split("\t");
return new Tuple5<>(arrs[0], arrs[1], arrs[2], arrs[3],
arrs[4]);
}
});
SingleOutputStreamOperator<Tuple5<String, String, String, String,
String>> dayOutput = env.readTextFile(historyFileName)
.flatMap(new FlatParseLong())
.union(oneDayLog)
.connect(blackList_b)
.process(new BroadcastProcessFunction<Tuple5<String, String,
String, String, String>, String, Tuple5<String, String, String, String,
String>>() {
private transient ReadOnlyBroadcastState<String, Boolean>
broadcastState;
@Override
public void processElement(Tuple5<String, String, String,
String, String> value, ReadOnlyContext ctx, Collector<Tuple5<String, String,
String, String, String>> out) throws Exception {
if(broadcastState == null){
broadcastState = ctx.getBroadcastState(type);
}
if(value!=null && !broadcastState.contains(value.f0)){
out.collect(value);
}
}
@Override
public void processBroadcastElement(String value, Context
ctx, Collector<Tuple5<String, String, String, String, String>> out) throws
Exception {
if(StringUtils.isNotEmpty(value)){
BroadcastState<String, Boolean> broadcastState =
ctx.getBroadcastState(type);
broadcastState.put(value, true);
}
}
});