Re: Can not get OutPutTag datastream from Windowing function

2018-07-17 Thread Xingcan Cui
Hi Soheil,

The `getSideOutput()` method is defined on the operator instead of the 
datastream.
You can invoke it after any action (e.g., map, window) performed on a 
datastream.

Best,
Xingcan

> On Jul 17, 2018, at 3:36 PM, Soheil Pourbafrani  wrote:
> 
> Hi, according to the documents I tried to get late data using side output.
> 
> final OutputTag> lateOutputTag = new 
> OutputTag>("late-data"){};
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process);
> 
> When trying to store late data in a Datastream (As shown in document):
> DataStream> lateData = res.
> there is no predefined getSideOutput method on DataStream res!
> But if I call getSideOutput just after reduce function, it is known! But I 
> don't want to save late data on res variable and I want to save them on 
> another variable!
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process)
>  .getSideoutput(lateOutputTag);
> What is the problem here?
> 
> 



Re: Can not get OutPutTag datastream from Windowing function

2018-07-17 Thread Dawid Wysakowicz
Hi Soheil,

The /getSideOutput/ method is part of /SingleOutputStreamOperator/ which
extends /DataStream///. Try using /SingleOutputStreamOperator/ as the
type for your res variable.

Best,

Dawid


On 17/07/18 09:36, Soheil Pourbafrani wrote:
> Hi, according to theĀ documents I tried to get late data using side
> output.
>
> final OutputTag> lateOutputTag = new 
> OutputTag>("late-data"){};
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ 
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process);
>
> When trying to store late data in a Datastream (As shown in document):
> DataStream> lateData = res.
> there is no predefined getSideOutput method on DataStream res!
> But if I callĀ getSideOutput just after reduce function, it is known!
> But I don't want to save late data on res variable and I want to save
> them on another variable!
> DataStream> res = aggregatedTuple
> .assignTimestampsAndWatermarks(new Bound())
> }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ 
> .allowedLateness(Time.milliseconds(2))
> .sideOutputLateData(lateOutputTag)
> .reduce(Do some process)
>  .getSideoutput(lateOutputTag);
> What is the problem here?
>
>



signature.asc
Description: OpenPGP digital signature


Can not get OutPutTag datastream from Windowing function

2018-07-17 Thread Soheil Pourbafrani
Hi, according to the documents I tried to get late data using side output.

final OutputTag> lateOutputTag = new
OutputTag>("late-data"){};

DataStream> res = aggregatedTuple
.assignTimestampsAndWatermarks(new Bound())
}).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do some process);


When trying to store late data in a Datastream (As shown in document):

DataStream> lateData = res.

there is no predefined getSideOutput method on DataStream res!
But if I call getSideOutput just after reduce function, it is known! But I
don't want to save late data on res variable and I want to save them on
another variable!

DataStream> res = aggregatedTuple
.assignTimestampsAndWatermarks(new Bound())
}).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do some process)

 .getSideoutput(lateOutputTag);

What is the problem here?