这个问题en...出在如下地方:
KeyedStream<ShareRealTimeData, String> keyByStream =
signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() {
@Override
public String getKey(ShareRealTimeData value) throws Exception {
return DateUtilMinutes.timeStampToDate(new
Date().getTime()); // 此处,不可以使用new Date这种当前时间。
}
});
修改,如果非要实现这种效果,可以先通过flatMap方式,针对每个元素 new Date
然后将这个date设置到ShareRealTimeData类中的一个字段(比如叫做key)。
然后再 keyBy(e->e.getKey()) 基于key这个字段做keyBy,效果一样,但不会出你这个问题。
原理比较复杂,和Flink的key分发机制有关,你这种写法会导致一个元素的key不稳定,因为实际就是<随机>的key。
lp <[email protected]> 于2021年1月5日周二 下午8:11写道:
> operator操作:processWindowFunction的代码如下:
>
> class MyProcessWindowFuncation extends
> ProcessWindowFunction<ShareRealTimeData, TreeMap<Double,
> Tuple2<String, String>>, String, TimeWindow>{
> private transient MapState<String, Tuple2<String, Double>>
> eveShareNoMaxPrice;
> private transient ValueState<TreeMap<Double, Tuple2<String,
> String>>> shareAndMaxPrice;
>
>
> @Override
> public void process(String s, Context context,
> Iterable<ShareRealTimeData> elements, Collector<TreeMap<Double,
> Tuple2<String, String>>> out) throws Exception {
> Iterator<ShareRealTimeData> iterator = elements.iterator();
>
> //得到每trigger周期内每个shareNo的最大值
> while (iterator.hasNext()) {
> ShareRealTimeData next = iterator.next();
> Tuple2<String, Double> t2 =
> eveShareNoMaxPrice.get(next.getShareNo());
> if (t2 == null || t2.f1 < next.getCurrentPrice()) {
> eveShareNoMaxPrice.put(next.getShareNo(),
> Tuple2.of(next.getShareName(), next.getCurrentPrice()));
> }
> }
>
>
> TreeMap<Double, Tuple2<String, String>> shareAndMaxPriceV =
> shareAndMaxPrice.value();
> if (shareAndMaxPriceV == null) {
> shareAndMaxPriceV = new TreeMap(new Comparator<Double>() {
> @Override
> public int compare(Double o1, Double o2) {
> return Double.compare(o2, o1);
> }
> });
> }
> Iterator<Map.Entry<String, Tuple2<String, Double>>>
> keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator();
> while (keysAndMaxPrice.hasNext()) {
> Map.Entry<String, Tuple2<String, Double>> next =
> keysAndMaxPrice.next();
>
> shareAndMaxPriceV.put(next.getValue().f1,
> Tuple2.of(next.getKey(), next.getValue().f0));
> if (shareAndMaxPriceV.size() > 20) {
> shareAndMaxPriceV.pollLastEntry();
> }
> }
>
> eveShareNoMaxPrice.clear();
> shareAndMaxPrice.clear();
>
> out.collect(shareAndMaxPriceV);
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> eveShareNoMaxPrice = getRuntimeContext().getMapState(new
> MapStateDescriptor<String, Tuple2<String, Double>>("eveShareNoMaxPrice",
> TypeInformation.of(new TypeHint<String>() {
> }), TypeInformation.of(new TypeHint<Tuple2<String,
> Double>>()
> {
> })));
> shareAndMaxPrice = getRuntimeContext().getState(new
> ValueStateDescriptor<TreeMap<Double, Tuple2<String,
> String>>>("shareAndMaxPrice", TypeInformation.of(new
> TypeHint<TreeMap<Double, Tuple2<String, String>>>() {
> })));
> }
> }
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>