Hi Suman,
> But I am always seeing the following code of `
*AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0.
It is weird, please set a breakpoint at line `
*bundleTrigger.onElement(input);*`  in `*processElement*` method to see
what happens when a record is processed by `*processElement*`.

> One more question, you mentioned that I need to test with `*LinkedHashMap*`
instead of `*HashMap*`. Where should I make this change?
You could copy the class  `AbstractMapBundleOperator`, and update the
bundle initialization code in the `open` method.
Besides, MapBundleFunction, MapBundleOperator, and CountBundleTrigger are
not marked  as @public, they have no guarantee of compatibility.
You'd better copy them for your own use.

Best,
JING ZHANG

suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 下午2:18写道:

> Hi Jing,
> I tried using `*MapBundleOperator*` also (I am yet to test with
> LinkedHashMap) . But I am always seeing that the following code of `
> *AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
> never getting incremented. I replaced `*TaxiFareStream*` with `
> *MapBundleOperator*` in the above code. It should increment by 1
> each time an element is processed but that is not happening.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *    public void processElement(StreamRecord<IN> element) throws Exception
> {        // get the key and value for the map bundle        final IN input
> = element.getValue();        final K bundleKey = getKey(input);
> final V bundleValue = bundle.get(bundleKey);        // get a new value
> after adding this element to bundle        final V newBundleValue =
> function.addInput(bundleValue, input);        // update to map bundle
>   bundle.put(bundleKey, newBundleValue);        numOfElements++;
> bundleTrigger.onElement(input);    }*
>
> One more question, you mentioned that I need to test with `*LinkedHashMap*`
> instead of `*HashMap*`. Where should I make this change? Do I need to
> create a class which extends from `MapBundleOperator` and add it there?
>
> Thanks
>
>
> On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <beyond1...@gmail.com> wrote:
>
>> Hi Suman,
>> Please try copy `*MapBundleOperator*`, update the `HashMap` to
>> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>>
>> Best,
>> JING ZHANG
>>
>> suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 上午2:23写道:
>>
>>> Hi Jing,
>>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I
>>> was following this link
>>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>>> . Please let me know if there is any other way of aggregating elements
>>> locally.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
>>> TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
>>>   public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
>>> TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
>>> bundleTrigger,                          KeySelector<TaxiFare, Long>
>>> keySelector) {        super(userFunction, bundleTrigger, keySelector);
>>>   this.keySelector = keySelector;    }    @Override    protected Long
>>> getKey(TaxiFare input) throws Exception {        return
>>> keySelector.getKey(input);    }}*
>>>
>>> Thanks
>>>
>>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com> wrote:
>>>
>>>> Hi Suman,
>>>> Would you please provide the code about `*TaxiFareStream*`? It seems
>>>> we could use `MapBundleOperator` directly here.
>>>> BTW, I have some concerns about using the solution to do
>>>> local-aggregation for window aggregation because `MapBundleOperator`
>>>> would save input data in a bundle which is a HashMap object which could
>>>> not keep the data input sequence. I'm afraid there exists
>>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it
>>>> is reasonable to assign a watermark based on an unordered
>>>> timestamp.
>>>>
>>>> Best,
>>>> JING ZHANG
>>>>
>>>>
>>>>
>>>> suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>>>>
>>>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>>>> MapBundle implementation.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *public class TaxiFareMapBundleFunction extends
>>>>> MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> {    @Override
>>>>> public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws
>>>>> Exception {        if (value == null) {            return input;        }
>>>>>       value.tip = value.tip + input.tip;        return value;    }
>>>>> @Override    public void finishBundle(Map<Long, TaxiFare> buffer,
>>>>> Collector<TaxiFare> out) throws Exception {        for (Map.Entry<Long,
>>>>> TaxiFare> entry : buffer.entrySet()) {
>>>>> out.collect(entry.getValue());        }    }}*
>>>>>
>>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
>>>>> is not working as the "*count*" variable is always 0. Please let me
>>>>> know If I am missing something.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *    @Override    public void onElement(T element) throws Exception {
>>>>>       count++;        if (count >= maxCount) {
>>>>> callback.finishBundle();            reset();        }    }*
>>>>>
>>>>> Here is the main code.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>>>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>>>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>>>>> KeySelector<TaxiFare, Long>() {            @Override            public 
>>>>> Long
>>>>> getKey(TaxiFare value) throws Exception {                return
>>>>> value.driverId;            }        };        DataStream<Tuple3<Long, 
>>>>> Long,
>>>>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>>>>> fare) -> fare.driverId)//
>>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>>>> AddTips());;                fares.transform("preshuffle",
>>>>> TypeInformation.of(TaxiFare.class),                        new
>>>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>>>>             ))                        .assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>>>>                       @Override                            public long
>>>>> extractTimestamp(TaxiFare element) {                                return
>>>>> element.startTime.getEpochSecond();                            }
>>>>>             })                        .keyBy((TaxiFare fare) ->
>>>>> fare.driverId)
>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, 
>>>>> Float>>
>>>>> hourlyMax =
>>>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>>>
>>>>> Thanks
>>>>> Suman
>>>>>
>>>>

Reply via email to