Re: Can not get OutPutTag datastream from Windowing function
Hi Soheil, The `getSideOutput()` method is defined on the operator instead of the datastream. You can invoke it after any action (e.g., map, window) performed on a datastream. Best, Xingcan > On Jul 17, 2018, at 3:36 PM, Soheil Pourbafrani wrote: > > Hi, according to the documents I tried to get late data using side output. > > final OutputTag> lateOutputTag = new > OutputTag>("late-data"){}; > DataStream> res = aggregatedTuple > .assignTimestampsAndWatermarks(new Bound()) > }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ > .allowedLateness(Time.milliseconds(2)) > .sideOutputLateData(lateOutputTag) > .reduce(Do some process); > > When trying to store late data in a Datastream (As shown in document): > DataStream> lateData = res. > there is no predefined getSideOutput method on DataStream res! > But if I call getSideOutput just after reduce function, it is known! But I > don't want to save late data on res variable and I want to save them on > another variable! > DataStream> res = aggregatedTuple > .assignTimestampsAndWatermarks(new Bound()) > }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ > .allowedLateness(Time.milliseconds(2)) > .sideOutputLateData(lateOutputTag) > .reduce(Do some process) > .getSideoutput(lateOutputTag); > What is the problem here? > >
Re: Can not get OutPutTag datastream from Windowing function
Hi Soheil, The /getSideOutput/ method is part of /SingleOutputStreamOperator/ which extends /DataStream///. Try using /SingleOutputStreamOperator/ as the type for your res variable. Best, Dawid On 17/07/18 09:36, Soheil Pourbafrani wrote: > Hi, according to theĀ documents I tried to get late data using side > output. > > final OutputTag> lateOutputTag = new > OutputTag>("late-data"){}; > DataStream> res = aggregatedTuple > .assignTimestampsAndWatermarks(new Bound()) > }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ > .allowedLateness(Time.milliseconds(2)) > .sideOutputLateData(lateOutputTag) > .reduce(Do some process); > > When trying to store late data in a Datastream (As shown in document): > DataStream> lateData = res. > there is no predefined getSideOutput method on DataStream res! > But if I callĀ getSideOutput just after reduce function, it is known! > But I don't want to save late data on res variable and I want to save > them on another variable! > DataStream> res = aggregatedTuple > .assignTimestampsAndWatermarks(new Bound()) > }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ > .allowedLateness(Time.milliseconds(2)) > .sideOutputLateData(lateOutputTag) > .reduce(Do some process) > .getSideoutput(lateOutputTag); > What is the problem here? > > signature.asc Description: OpenPGP digital signature
Can not get OutPutTag datastream from Windowing function
Hi, according to the documents I tried to get late data using side output. final OutputTag> lateOutputTag = new OutputTag>("late-data"){}; DataStream> res = aggregatedTuple .assignTimestampsAndWatermarks(new Bound()) }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ .allowedLateness(Time.milliseconds(2)) .sideOutputLateData(lateOutputTag) .reduce(Do some process); When trying to store late data in a Datastream (As shown in document): DataStream> lateData = res. there is no predefined getSideOutput method on DataStream res! But if I call getSideOutput just after reduce function, it is known! But I don't want to save late data on res variable and I want to save them on another variable! DataStream> res = aggregatedTuple .assignTimestampsAndWatermarks(new Bound()) }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ .allowedLateness(Time.milliseconds(2)) .sideOutputLateData(lateOutputTag) .reduce(Do some process) .getSideoutput(lateOutputTag); What is the problem here?