> sideoutput might deserve a seperate class which inherit form
singleoutput. It might prevent lot of confusions

Thanks, but how could that be done? Do you mean that if one calls
.process(), then the stream would change to another class which would only
allow calls like .getMainOutput() or .getSideOutput("name")? Of course
compile time error would be even better than a runtime error, but I don't
see yet how it could be done in practice.

On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnc...@gmail.com> wrote:

> Hi Juho,
>
> I think sideoutput might deserve a seperate class which inherit form
> singleoutput. It might prevent lot of confusions. A more generic question
> is whether datastream api can be mulitple ins and mulitple outs natively.
> It's more like scheduling problem when you come from single process system
> to multiple process system, which one should get resource and how much
> sharing same hardware resources, I guess it will open gate to lots of edge
> cases -> strategies-> more edge cases :)
>
> Chen
>
> On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <juho.au...@rovio.com> wrote:
>
>> Maybe I could express it in a slightly different way: if adding the
>> .filter() after .process() causes the side output to be somehow totally
>> "lost", then I believe the .getSideOutput() could be aware that there is
>> not such side output to be listened to from upstream, and throw an
>> exception. I mean, this should be possible when building the DAG, it
>> shouldn't require starting the stream to detect? Thanks..
>>
>> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
>> > wrote:
>>
>>> Hi Juho,
>>>
>>> Now that I think of it this seems like a bug to me: why does the call to
>>> getSideOutput succeed if it doesn't provide _any_ input?
>>>
>>>
>>> With the way side outputs work, I don’t think this is possible (or would
>>> make sense). An operator does not know whether or not it would ever emit
>>> some element with a given tag.
>>> As far as I understand it, calling `getSideOutput` essentially adds a
>>> virtual node to the result stream graph that listens to the specified tag
>>> from the upstream input.
>>>
>>> While I’m not aware whether or not your observation is expected
>>> behavior, from an API perspective, I can see why it is a bit confusing for
>>> you.
>>> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve
>>> looped him in cc’ed.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.au...@rovio.com)
>>> wrote:
>>>
>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed.
>>> If I switch the position of .process() & .filter() (ie. filter first, then
>>> process), both "a" & "b" are printed, as expected.
>>>
>>> I guess it's a bit hard to say what the side output should include in
>>> this case: the stream before filtering or after it?
>>>
>>> What I would suggest is Flink to protect against this kind of a user
>>> error that is hard to debug. Would it be possible that Flink throws an
>>> exception if one tries to call .getSideOutput() on anything that doesn't
>>> actually provide that side output? Now that I think of it this seems like a
>>> bug to me: why does the call to getSideOutput succeed if it doesn't provide
>>> _any_ input? I would expect it to get the side output data stream _before_
>>> applying .filter().
>>>
>>> import org.apache.flink.api.common.functions.FilterFunction;
>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>> import org.apache.flink.streaming.api.datastream.SingleOutputStream
>>> Operator;
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>> vironment;
>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>> import org.apache.flink.util.Collector;
>>> import org.apache.flink.util.OutputTag;
>>>
>>> public class SideOutputProblem {
>>>
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>         DataStreamSource<String> stream = env.fromElements("a", "b");
>>>         OutputTag<String> sideOutputTag = new
>>> OutputTag<String>("side-output"){};
>>>
>>>         SingleOutputStreamOperator<String> processed = stream
>>>
>>>                 .process(new ProcessFunction<String, String>() {
>>>                     @Override
>>>                     public void processElement(String s, Context
>>> context, Collector<String> collector) throws Exception {
>>>                         if ("a".equals(s)) {
>>>                             collector.collect(s);
>>>                         } else {
>>>                             context.output(sideOutputTag, s);
>>>                         }
>>>                     }
>>>                 })
>>>
>>>                 .filter(new FilterFunction<String>() {
>>>                     @Override
>>>                     public boolean filter(String s) throws Exception {
>>>                         return true;
>>>                     }
>>>                 });
>>>
>>>         processed.getSideOutput(sideOutputTag).printToErr();
>>>
>>>         processed.print();
>>>
>>>         env.execute();
>>>     }
>>>
>>> }
>>>
>>> Cheers,
>>> Juho
>>>
>>>
>>
>

Reply via email to