It would mean that getSideOutput() would return a SingleOutputWithSideOutputOperator which extends SingleOutputOperator offering getSideOutput(). Other transformations would still return a SingleOutputOperator.

With this the following code wouldn't compile.

stream
    .process(...)
    .filter(...)
    .getSideOutput(...) // compile error

You would have to explicitly define the code as below, which makes the behavior unambiguous:

processed = stream
    .process(...)

filtered = processed
    .filter(...)

filteredSideOutput = processed
    .getSideOutput(...)
    .filter(...)

On 15.01.2018 09:55, Juho Autio wrote:
> sideoutput might deserve a seperate class which inherit form singleoutput. It might prevent lot of confusions

Thanks, but how could that be done? Do you mean that if one calls .process(), then the stream would change to another class which would only allow calls like .getMainOutput() or .getSideOutput("name")? Of course compile time error would be even better than a runtime error, but I don't see yet how it could be done in practice.

On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnc...@gmail.com <mailto:qinnc...@gmail.com>> wrote:

    Hi Juho,

    I think sideoutput might deserve a seperate class which inherit
    form singleoutput. It might prevent lot of confusions. A more
    generic question is whether datastream api can be mulitple ins and
    mulitple outs natively. It's more like scheduling problem when you
    come from single process system to multiple process system, which
    one should get resource and how much sharing same hardware
    resources, I guess it will open gate to lots of edge cases ->
    strategies-> more edge cases :)

    Chen

    On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <juho.au...@rovio.com
    <mailto:juho.au...@rovio.com>> wrote:

        Maybe I could express it in a slightly different way: if
        adding the .filter() after .process() causes the side output
        to be somehow totally "lost", then I believe the
        .getSideOutput() could be aware that there is not such side
        output to be listened to from upstream, and throw an
        exception. I mean, this should be possible when building the
        DAG, it shouldn't require starting the stream to detect? Thanks..

        On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
        <tzuli...@apache.org <mailto:tzuli...@apache.org>> wrote:

            Hi Juho,

            Now that I think of it this seems like a bug to me: why
            does the call to getSideOutput succeed if it doesn't
            provide _any_ input?

            With the way side outputs work, I don’t think this is
            possible (or would make sense). An operator does not know
            whether or not it would ever emit some element with a
            given tag.
            As far as I understand it, calling `getSideOutput`
            essentially adds a virtual node to the result stream graph
            that listens to the specified tag from the upstream input.

            While I’m not aware whether or not your observation is
            expected behavior, from an API perspective, I can see why
            it is a bit confusing for you.
            Aljoscha would be the expert here, maybe he’ll have more
            insights. I’ve looped him in cc’ed.

            Cheers,
            Gordon


            On 12 January 2018 at 4:05:13 PM, Juho Autio
            (juho.au...@rovio.com <mailto:juho.au...@rovio.com>) wrote:

            When I run the code below (Flink 1.4.0 or 1.3.1), only
            "a" is printed. If I switch the position of .process() &
            .filter() (ie. filter first, then process), both "a" &
            "b" are printed, as expected.

            I guess it's a bit hard to say what the side output
            should include in this case: the stream before filtering
            or after it?

            What I would suggest is Flink to protect against this
            kind of a user error that is hard to debug. Would it be
            possible that Flink throws an exception if one tries to
            call .getSideOutput() on anything that doesn't actually
            provide that side output? Now that I think of it this
            seems like a bug to me: why does the call to
            getSideOutput succeed if it doesn't provide _any_ input?
            I would expect it to get the side output data stream
            _before_ applying .filter().

            import org.apache.flink.api.common.functions.FilterFunction;
            import
            org.apache.flink.streaming.api.datastream.DataStreamSource;
            import
            
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
            import
            
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import
            org.apache.flink.streaming.api.functions.ProcessFunction;
            import org.apache.flink.util.Collector;
            import org.apache.flink.util.OutputTag;

            public class SideOutputProblem {

            public static void main(String[] args) throws Exception {

            StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> stream = env.fromElements("a", "b");
            OutputTag<String> sideOutputTag = new
            OutputTag<String>("side-output"){};

            SingleOutputStreamOperator<String> processed = stream

            .process(new ProcessFunction<String, String>() {
              @Override
              public void processElement(String s, Context context,
            Collector<String> collector) throws Exception {
                  if ("a".equals(s)) {
            collector.collect(s);
                  } else {
            context.output(sideOutputTag, s);
                  }
              }
            })

            .filter(new FilterFunction<String>() {
              @Override
              public boolean filter(String s) throws Exception {
                  return true;
              }
            });

            processed.getSideOutput(sideOutputTag).printToErr();

            processed.print();

            env.execute();
                }

            }

            Cheers,
            Juho





Reply via email to