operator操作:processWindowFunction的代码如下:

class MyProcessWindowFuncation extends
ProcessWindowFunction<ShareRealTimeData, TreeMap&lt;Double,
Tuple2&lt;String, String>>, String, TimeWindow>{
        private transient MapState<String, Tuple2&lt;String, Double>>
eveShareNoMaxPrice;
        private transient ValueState<TreeMap&lt;Double, Tuple2&lt;String,
String>>> shareAndMaxPrice;


    @Override
        public void process(String s, Context context,
Iterable<ShareRealTimeData> elements, Collector<TreeMap&lt;Double,
Tuple2&lt;String, String>>> out) throws Exception {
            Iterator<ShareRealTimeData> iterator = elements.iterator();

            //得到每trigger周期内每个shareNo的最大值
            while (iterator.hasNext()) {
                ShareRealTimeData next = iterator.next();
                Tuple2<String, Double> t2 =
eveShareNoMaxPrice.get(next.getShareNo());
                if (t2 == null || t2.f1 < next.getCurrentPrice()) {
                    eveShareNoMaxPrice.put(next.getShareNo(),
Tuple2.of(next.getShareName(), next.getCurrentPrice()));
                }
            }


            TreeMap<Double, Tuple2&lt;String, String>> shareAndMaxPriceV =
shareAndMaxPrice.value();
            if (shareAndMaxPriceV == null) {
                shareAndMaxPriceV = new TreeMap(new Comparator<Double>() {
                    @Override
                    public int compare(Double o1, Double o2) {
                        return Double.compare(o2, o1);
                    }
                });
            }
            Iterator<Map.Entry&lt;String, Tuple2&lt;String, Double>>>
keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator();
            while (keysAndMaxPrice.hasNext()) {
                Map.Entry<String, Tuple2&lt;String, Double>> next =
keysAndMaxPrice.next();

                shareAndMaxPriceV.put(next.getValue().f1,
Tuple2.of(next.getKey(), next.getValue().f0));
                if (shareAndMaxPriceV.size() > 20) {
                    shareAndMaxPriceV.pollLastEntry();
                }
            }

            eveShareNoMaxPrice.clear();
            shareAndMaxPrice.clear();

            out.collect(shareAndMaxPriceV);
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            eveShareNoMaxPrice = getRuntimeContext().getMapState(new
MapStateDescriptor<String, Tuple2&lt;String, Double>>("eveShareNoMaxPrice",
TypeInformation.of(new TypeHint<String>() {
            }), TypeInformation.of(new TypeHint<Tuple2&lt;String, Double>>()
{
            })));
            shareAndMaxPrice = getRuntimeContext().getState(new
ValueStateDescriptor<TreeMap&lt;Double, Tuple2&lt;String,
String>>>("shareAndMaxPrice", TypeInformation.of(new
TypeHint<TreeMap&lt;Double, Tuple2&lt;String, String>>>() {
            })));
        }
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复