您好!
重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。
在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。
请问是哪里写错了吗? 和 flink官网中 state.update(current);有什么不同吗?
以下为代码:
private MapState<String, MicsTrainPractical> map; //定义map
@Override
public void processElement(MicsTrainPracticalDetail value, Context ctx,
Collector<MicsTrainPractical> out) throws Exception {
MicsTrainPractical current = map.get(value.getTrainNumber());
System.out.println(map.isEmpty()); //
每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据
Long departTime =
DateTimeUtils.convertToTimestampSecond(value.getDepartTime());
Long arrivalTime =
DateTimeUtils.convertToTimestampSecond(value.getArrivalTime());
if (current == null) { //如果map中没有获取到string相同的数据,则新建一条数据put进去
MicsTrainPractical actual = new MicsTrainPractical();
actual.setTrainId(value.getTrainNumber());
actual.setCarId(value.getCarId());
actual.setStartStationId(value.getStationId());
actual.setStartPracticalArrivalTime(arrivalTime);
actual.setEndPracticalArrivalTime(arrivalTime);
actual.setStartPracticalDepartTime(departTime);
actual.setEndPracticalDepartTime(departTime);
actual.setEndStationId(value.getStationId());
actual.setStopTime(0L);
actual.setDs(value.getDs());
actual.setIsInsert(true);
actual.setTargetStationId(value.getTargetStationId());
out.collect(actual);
map.put(value.getTrainNumber(), actual); //向map中写入数据
return;
} else { //如果map有获取到string相同的数据,则转换数据后写入map
MicsTrainPractical actual = new MicsTrainPractical();
actual.setTrainId(value.getTrainNumber());
actual.setCarId(value.getCarId());
actual.setStartStationId(current.getStartStationId());
actual.setEndStationId(value.getStationId());
actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime());
actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime());
actual.setEndPracticalArrivalTime(arrivalTime);
actual.setEndPracticalDepartTime(departTime);
actual.setStopTime(current.getStopTime() + Math.abs(departTime -
arrivalTime));
actual.setDs(value.getDs());
actual.setIsInsert(false);
actual.setTargetStationId(value.getTargetStationId());
current.setEndStationId(actual.getEndStationId());
current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime());
current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime());
current.setStopTime(actual.getStopTime());
current.setIsInsert(actual.getIsInsert());
MicsTrainSectionInfo trainSectionInfo = new MicsTrainSectionInfo();
trainSectionInfo.setTrainId(actual.getTrainId());
trainSectionInfo.setCarId(actual.getCarId());
trainSectionInfo.setStartStationId(current.getEndStationId());
trainSectionInfo.setEndStationId(actual.getEndStationId());
trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime());
trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime());
map.put(value.getTrainNumber(), actual);
out.collect(actual);
ctx.output(outputTagSectionFlow, trainSectionInfo);
}
}
谌祖安
智能轨道交通业务群 / 产品事业部 / 开发经理
Intelligent RailTransportation BG /Development Manager
广东省广州市天河区新岑四路2号佳都智慧大厦
PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District,
Guangzhou, Guangdong
E [email protected]
M 86-18680458868
www.pcitech.com