Hi Vijay,

When using windows, you may use the 'trigger' to set a Custom Trigger which
would trigger your *ProcessWindowFunction* accordingly.

In your case, you would probably use:

> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))*
>

Thanks,
Rafi


On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> I am also implementing the ProcessWindowFunction and accessing the
> windowState to get data but how do i push data out every 5 mins during a 4
> hr time window ?? I am adding a globalState to handle the 4 hr window ???
> Or should I still use the context.windowState even for the 4 hr window ?
>
> public  class MGroupingAggregateClass extends ProcessWindowFunction<....> {
>>
>> private MapState<String, Object> timedGroupKeyState;
>> private MapState<String, Object> globalGroupKeyState;
>> private final MapStateDescriptor<String, Object>
>> timedMapKeyStateDescriptor =
>>        new MapStateDescriptor<>("timedGroupKeyState",
>>                String.class, Object.class);
>> private final MapStateDescriptor<String, Object>
>> globalMapKeyStateDescriptor =
>>            new MapStateDescriptor<>("globalGroupKeyState",
>>                    String.class, Object.class);
>>
>>
>> public void open(Configuration ..) {
>> timedGroupKeyState =
>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor);
>> globalGroupKeyState =
>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor);
>> }
>>
>> public void process(MonitoringTuple currKey, Context context,
>> Iterable<Map<String, Object>> elements,
>>                        Collector<Map<String, Object>> out) throws
>> Exception {
>>        logger.info("Entered MGroupingAggregateWindowProcessing - process
>> interval:{}, currKey:{}", interval, currKey);
>>        timedGroupKeyState =
>> context.windowState().getMapState(timedMapKeyStateDescriptor);
>>        globalGroupKeyState =
>> context.globalState().getMapState(globalMapKeyStateDescriptor);
>> ...
>> //get data fromm state
>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey);
>>
>> //how do i push the data out every 5 mins to the sink during the 4 hr
>> window ??
>>
>> }
>>
>
>
>
>
>
>
>
> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Hi,
>> Need to calculate a 4 hour time window for count, sum with current
>> calculated results being output every 5 mins.
>> How do i do that ?
>> Currently, I calculate results for 5 sec and 5 min time windows fine on
>> the KeyedStream.
>>
>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: timeWindow =
>>> Time.seconds(timeIntervalL);
>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream =
>>> kinesisStream.keyBy(...);
>>> final WindowedStream<Map<String, Object>, ...., TimeWindow> windowStream
>>> =
>>>         monitoringTupleKeyedStream
>>>                 .timeWindow(timeWindow);
>>> DataStream<....> enrichedMGStream = windowStream.aggregate(
>>>         new MGroupingWindowAggregateClass(...),
>>>         new MGroupingAggregateClass(....))
>>>         .map(new Monitoring...(...));
>>> enrichedMGStream.addSink(..);
>>>
>>
>>
>> TIA,
>> Vijay
>>
>

Reply via email to