operator操作:processWindowFunction的代码如下:
class MyProcessWindowFuncation extends
ProcessWindowFunction<ShareRealTimeData, TreeMap<Double,
Tuple2<String, String>>, String, TimeWindow>{
private transient MapState<String, Tuple2<String, Double>>
eveShareNoMaxPrice;
private transient ValueState<TreeMap<Double, Tuple2<String,
String>>> shareAndMaxPrice;
@Override
public void process(String s, Context context,
Iterable<ShareRealTimeData> elements, Collector<TreeMap<Double,
Tuple2<String, String>>> out) throws Exception {
Iterator<ShareRealTimeData> iterator = elements.iterator();
//得到每trigger周期内每个shareNo的最大值
while (iterator.hasNext()) {
ShareRealTimeData next = iterator.next();
Tuple2<String, Double> t2 =
eveShareNoMaxPrice.get(next.getShareNo());
if (t2 == null || t2.f1 < next.getCurrentPrice()) {
eveShareNoMaxPrice.put(next.getShareNo(),
Tuple2.of(next.getShareName(), next.getCurrentPrice()));
}
}
TreeMap<Double, Tuple2<String, String>> shareAndMaxPriceV =
shareAndMaxPrice.value();
if (shareAndMaxPriceV == null) {
shareAndMaxPriceV = new TreeMap(new Comparator<Double>() {
@Override
public int compare(Double o1, Double o2) {
return Double.compare(o2, o1);
}
});
}
Iterator<Map.Entry<String, Tuple2<String, Double>>>
keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator();
while (keysAndMaxPrice.hasNext()) {
Map.Entry<String, Tuple2<String, Double>> next =
keysAndMaxPrice.next();
shareAndMaxPriceV.put(next.getValue().f1,
Tuple2.of(next.getKey(), next.getValue().f0));
if (shareAndMaxPriceV.size() > 20) {
shareAndMaxPriceV.pollLastEntry();
}
}
eveShareNoMaxPrice.clear();
shareAndMaxPrice.clear();
out.collect(shareAndMaxPriceV);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
eveShareNoMaxPrice = getRuntimeContext().getMapState(new
MapStateDescriptor<String, Tuple2<String, Double>>("eveShareNoMaxPrice",
TypeInformation.of(new TypeHint<String>() {
}), TypeInformation.of(new TypeHint<Tuple2<String, Double>>()
{
})));
shareAndMaxPrice = getRuntimeContext().getState(new
ValueStateDescriptor<TreeMap<Double, Tuple2<String,
String>>>("shareAndMaxPrice", TypeInformation.of(new
TypeHint<TreeMap<Double, Tuple2<String, String>>>() {
})));
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/