Collect the elements to a list, then sort, then collect out. HG <hanspeter.sl...@gmail.com> 于2022年3月3日周四 22:13写道:
> Hi, > I have need to sort the input of the ProcesWindowFunction by one of the > fields of the Tuple4 that is in the Iterator. > > Any advice as to what the best way is? > > static class MyProcessWindowFunction extends > ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String, > TimeWindow> { > @Override > public void process(String key, Context context, > Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out) > { > Long elapsed = 0L; > Long pHandlingTime = 0L; > Long totalElapsed = 0L > > System.out.println(input.getClass()); > > Iterator<Tuple4<Long, Long, String, String>> etter = > input.iterator(); > *for (Tuple4<Long, Long, String, String> in: input){* > transactionId = in.getField(2).toString(); > elapsed = Long.parseLong(in.getField(1).toString()) > - pHandlingTime; > totalElapsed = totalElapsed + elapsed; > pHandlingTime = Long.parseLong(in.getField(1).toString()) > > out.collect("Key : " + key + " Window : " + > context.window() + " transactionId : " + transactionId + " elapsed : " + > elapsed.toString() + " max handling time : " + h.toString() + " > totalElapsed " + totalElapsed); > } > } > } > > > Op do 3 mrt. 2022 om 15:12 schreef HG <hanspeter.sl...@gmail.com>: > >> Hi, >> I have need to sort the input of the ProcesWindowFunction by one of the >> fields of the Tuple4 that is in the Iterator. >> >> static class MyProcessWindowFunction extends >> ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String, >> TimeWindow> { >> @Override >> public void process(String key, Context context, >> Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out) >> { >> Long elapsed = 0L; >> Long pHandlingTime = 0L; >> Long totalElapsed = 0L >> >> System.out.println(input.getClass()); >> >> Iterator<Tuple4<Long, Long, String, String>> etter = >> input.iterator(); >> *for (Tuple4<Long, Long, String, String> in: input){* >> transactionId = in.getField(2).toString(); >> elapsed = Long.parseLong(in.getField(1).toString()) >> - pHandlingTime; >> totalElapsed = totalElapsed + elapsed; >> pHandlingTime = Long.parseLong(in.getField(1).toString()) >> >> out.collect("Key : " + key + " Window : " + >> context.window() + " transactionId : " + transactionId + " elapsed : " + >> elapsed.toString() + " max handling time : " + h.toString() + " >> totalElapsed " + totalElapsed); >> } >> } >> } >> >