Re: Tumbling window rich functionality
Hi, apply() will be called for each key. On Wed, Oct 12, 2016 at 2:25 PM, Swapnil Chougule wrote: > Thanks Aljoscha. > > Whenever I am using WindowFunction.apply() on keyed stream, apply() will > be called once or multiple times (equal to number of keys in that windowed > stream)? > > Ex: > DataStream dataStream = env > .socketTextStream("localhost", ) > .flatMap(new Splitter()) > .keyBy(0) > .timeWindow(Time.seconds(10)) > .apply(new WindowFunction, > Boolean, Tuple, TimeWindow>() { > > @Override > public void apply(Tuple key, TimeWindow window, > Iterable> input, > Collector out) throws Exception { > //Some business logic > } > }); > > Regards, > Swapnil > > On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek > wrote: > >> Hi, >> WindowFunction.apply() will be called once for each window so you should >> be able to do the setup/teardown in there. open() and close() are called at >> the start of processing, end of processing, respectively. >> >> Cheers, >> Aljoscha >> >> On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule >> wrote: >> >>> Hi Team, >>> >>> I am using tumbling window functionality having window size 5 minutes. >>> I want to perform setup & teardown functionality for each window. I >>> tried using RichWindowFunction but it didn't work for me. >>> Can anybody tell me how can I do it ? >>> >>> Attaching code snippet what I tried >>> >>> impressions.map(new LineItemAdUnitAggr()).keyBy(0) >>> .timeWindow(Time.seconds(300)).apply(new >>> RichWindowFunction,Long>, >>> Boolean, Tuple, TimeWindow>() { >>> >>> @Override >>> public void open(Configuration parameters) throws >>> Exception { >>> super.open(parameters); >>> //setup method >>> } >>> >>> public void apply(Tuple key, TimeWindow window, >>> Iterable, >>> Long>> input, >>> Collector out) throws Exception { >>> //do processing >>> } >>> >>> @Override >>> public void close() throws Exception { >>> //tear down method >>> super.close(); >>> } >>> }); >>> >>> Thanks, >>> Swapnil >>> >> >
Re: Tumbling window rich functionality
Thanks Aljoscha. Whenever I am using WindowFunction.apply() on keyed stream, apply() will be called once or multiple times (equal to number of keys in that windowed stream)? Ex: DataStream dataStream = env .socketTextStream("localhost", ) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(10)) .apply(new WindowFunction, Boolean, Tuple, TimeWindow>() { @Override public void apply(Tuple key, TimeWindow window, Iterable> input, Collector out) throws Exception { //Some business logic } }); Regards, Swapnil On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek wrote: > Hi, > WindowFunction.apply() will be called once for each window so you should > be able to do the setup/teardown in there. open() and close() are called at > the start of processing, end of processing, respectively. > > Cheers, > Aljoscha > > On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule > wrote: > >> Hi Team, >> >> I am using tumbling window functionality having window size 5 minutes. >> I want to perform setup & teardown functionality for each window. I tried >> using RichWindowFunction but it didn't work for me. >> Can anybody tell me how can I do it ? >> >> Attaching code snippet what I tried >> >> impressions.map(new LineItemAdUnitAggr()).keyBy(0) >> .timeWindow(Time.seconds(300)).apply(new >> RichWindowFunction,Long>, >> Boolean, Tuple, TimeWindow>() { >> >> @Override >> public void open(Configuration parameters) throws >> Exception { >> super.open(parameters); >> //setup method >> } >> >> public void apply(Tuple key, TimeWindow window, >> Iterable, Long>> >> input, >> Collector out) throws Exception { >> //do processing >> } >> >> @Override >> public void close() throws Exception { >> //tear down method >> super.close(); >> } >> }); >> >> Thanks, >> Swapnil >> >
Re: Tumbling window rich functionality
Hi, WindowFunction.apply() will be called once for each window so you should be able to do the setup/teardown in there. open() and close() are called at the start of processing, end of processing, respectively. Cheers, Aljoscha On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule wrote: > Hi Team, > > I am using tumbling window functionality having window size 5 minutes. > I want to perform setup & teardown functionality for each window. I tried > using RichWindowFunction but it didn't work for me. > Can anybody tell me how can I do it ? > > Attaching code snippet what I tried > > impressions.map(new > LineItemAdUnitAggr()).keyBy(0).timeWindow(Time.seconds(300)).apply(new > RichWindowFunction,Long>, Boolean, Tuple, > TimeWindow>() { > > @Override > public void open(Configuration parameters) throws > Exception { > super.open(parameters); > //setup method > } > > public void apply(Tuple key, TimeWindow window, > Iterable, Long>> > input, > Collector out) throws Exception { > //do processing > } > > @Override > public void close() throws Exception { > //tear down method > super.close(); > } > }); > > Thanks, > Swapnil >