Hi Suman, > But I am always seeing the following code of ` *AbstractMapBundleOperator.java*` `*numOfElements` *is always 0. It is weird, please set a breakpoint at line ` *bundleTrigger.onElement(input);*` in `*processElement*` method to see what happens when a record is processed by `*processElement*`.
> One more question, you mentioned that I need to test with `*LinkedHashMap*` instead of `*HashMap*`. Where should I make this change? You could copy the class `AbstractMapBundleOperator`, and update the bundle initialization code in the `open` method. Besides, MapBundleFunction, MapBundleOperator, and CountBundleTrigger are not marked as @public, they have no guarantee of compatibility. You'd better copy them for your own use. Best, JING ZHANG suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 下午2:18写道: > Hi Jing, > I tried using `*MapBundleOperator*` also (I am yet to test with > LinkedHashMap) . But I am always seeing that the following code of ` > *AbstractMapBundleOperator.java*` `*numOfElements` *is always 0. It is > never getting incremented. I replaced `*TaxiFareStream*` with ` > *MapBundleOperator*` in the above code. It should increment by 1 > each time an element is processed but that is not happening. > > > > > > > > > > > > > > > > > * public void processElement(StreamRecord<IN> element) throws Exception > { // get the key and value for the map bundle final IN input > = element.getValue(); final K bundleKey = getKey(input); > final V bundleValue = bundle.get(bundleKey); // get a new value > after adding this element to bundle final V newBundleValue = > function.addInput(bundleValue, input); // update to map bundle > bundle.put(bundleKey, newBundleValue); numOfElements++; > bundleTrigger.onElement(input); }* > > One more question, you mentioned that I need to test with `*LinkedHashMap*` > instead of `*HashMap*`. Where should I make this change? Do I need to > create a class which extends from `MapBundleOperator` and add it there? > > Thanks > > > On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <beyond1...@gmail.com> wrote: > >> Hi Suman, >> Please try copy `*MapBundleOperator*`, update the `HashMap` to >> `LinkedHashMap` to keep the output sequence consistent with input sequence. >> >> Best, >> JING ZHANG >> >> suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 上午2:23写道: >> >>> Hi Jing, >>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I >>> was following this link >>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html >>> . Please let me know if there is any other way of aggregating elements >>> locally. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare, >>> TaxiFare, TaxiFare> { private KeySelector<TaxiFare, Long> keySelector; >>> public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare, >>> TaxiFare> userFunction, BundleTrigger<TaxiFare> >>> bundleTrigger, KeySelector<TaxiFare, Long> >>> keySelector) { super(userFunction, bundleTrigger, keySelector); >>> this.keySelector = keySelector; } @Override protected Long >>> getKey(TaxiFare input) throws Exception { return >>> keySelector.getKey(input); }}* >>> >>> Thanks >>> >>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com> wrote: >>> >>>> Hi Suman, >>>> Would you please provide the code about `*TaxiFareStream*`? It seems >>>> we could use `MapBundleOperator` directly here. >>>> BTW, I have some concerns about using the solution to do >>>> local-aggregation for window aggregation because `MapBundleOperator` >>>> would save input data in a bundle which is a HashMap object which could >>>> not keep the data input sequence. I'm afraid there exists >>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it >>>> is reasonable to assign a watermark based on an unordered >>>> timestamp. >>>> >>>> Best, >>>> JING ZHANG >>>> >>>> >>>> >>>> suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道: >>>> >>>>> I am trying to do pre shuffle aggregation in flink. Following is the >>>>> MapBundle implementation. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> *public class TaxiFareMapBundleFunction extends >>>>> MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> { @Override >>>>> public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws >>>>> Exception { if (value == null) { return input; } >>>>> value.tip = value.tip + input.tip; return value; } >>>>> @Override public void finishBundle(Map<Long, TaxiFare> buffer, >>>>> Collector<TaxiFare> out) throws Exception { for (Map.Entry<Long, >>>>> TaxiFare> entry : buffer.entrySet()) { >>>>> out.collect(entry.getValue()); } }}* >>>>> >>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation >>>>> is not working as the "*count*" variable is always 0. Please let me >>>>> know If I am missing something. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> * @Override public void onElement(T element) throws Exception { >>>>> count++; if (count >= maxCount) { >>>>> callback.finishBundle(); reset(); } }* >>>>> >>>>> Here is the main code. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> * MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> >>>>> mapBundleFunction = new TaxiFareMapBundleFunction(); >>>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10); >>>>> KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new >>>>> KeySelector<TaxiFare, Long>() { @Override public >>>>> Long >>>>> getKey(TaxiFare value) throws Exception { return >>>>> value.driverId; } }; DataStream<Tuple3<Long, >>>>> Long, >>>>> Float>> hourlyTips =// fares.keyBy((TaxiFare >>>>> fare) -> fare.driverId)// >>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new >>>>> AddTips());; fares.transform("preshuffle", >>>>> TypeInformation.of(TaxiFare.class), new >>>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector >>>>> )) .assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) { >>>>> @Override public long >>>>> extractTimestamp(TaxiFare element) { return >>>>> element.startTime.getEpochSecond(); } >>>>> }) .keyBy((TaxiFare fare) -> >>>>> fare.driverId) >>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >>>>> .process(new AddTips()); DataStream<Tuple3<Long, Long, >>>>> Float>> >>>>> hourlyMax = >>>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);* >>>>> >>>>> Thanks >>>>> Suman >>>>> >>>>