I've opened https://issues.apache.org/jira/browse/FLINK-8437

Unfortunately i doubt we can fix this properly. The proposed solution will not work if we ever allow arbitrary functions to use side-outputs.

On 16.01.2018 08:59, Juho Autio wrote:
Could someone with knowledge of the right terms create this in JIRA, please? I guess I could also create it if needed..

On Mon, Jan 15, 2018 at 3:15 PM, Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    yes, i meant that process() returns the special operator. This
    would definitely deserve a JIRA issue.


    On 15.01.2018 14:09, Juho Autio wrote:
    Thanks for the explanation. Did you meant that process() would
    return a SingleOutputWithSideOutputOperator?

    Any way, that should be enough to avoid the problem that I hit
    (and it also seems like the best & only way).

    Maybe the name should be something more generic though, like
    ProcessedSingleOutputOperator or something, I wouldn't know..

    Would this deserve an improvement ticket in JIRA?

    On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        It would mean that getSideOutput() would return a
        SingleOutputWithSideOutputOperator which extends
        SingleOutputOperator offering getSideOutput(). Other
        transformations would still return a SingleOutputOperator.

        With this the following code wouldn't compile.

        stream
            .process(...)
            .filter(...)
            .getSideOutput(...) // compile error

        You would have to explicitly define the code as below, which
        makes the behavior unambiguous:

        processed = stream
            .process(...)

        filtered = processed
            .filter(...)

        filteredSideOutput = processed
            .getSideOutput(...)
            .filter(...)


        On 15.01.2018 09:55, Juho Autio wrote:
        > sideoutput might deserve a seperate class which inherit
        form singleoutput. It might prevent lot of confusions

        Thanks, but how could that be done? Do you mean that if one
        calls .process(), then the stream would change to another
        class which would only allow calls like .getMainOutput() or
        .getSideOutput("name")? Of course compile time error would
        be even better than a runtime error, but I don't see yet how
        it could be done in practice.

        On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin
        <qinnc...@gmail.com <mailto:qinnc...@gmail.com>> wrote:

            Hi Juho,

            I think sideoutput might deserve a seperate class which
            inherit form singleoutput. It might prevent lot of
            confusions. A more generic question is whether
            datastream api can be mulitple ins and mulitple outs
            natively. It's more like scheduling problem when you
            come from single process system to multiple process
            system, which one should get resource and how much
            sharing same hardware resources, I guess it will open
            gate to lots of edge cases -> strategies-> more edge
            cases :)

            Chen

            On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio
            <juho.au...@rovio.com <mailto:juho.au...@rovio.com>> wrote:

                Maybe I could express it in a slightly different
                way: if adding the .filter() after .process() causes
                the side output to be somehow totally "lost", then I
                believe the .getSideOutput() could be aware that
                there is not such side output to be listened to from
                upstream, and throw an exception. I mean, this
                should be possible when building the DAG, it
                shouldn't require starting the stream to detect?
                Thanks..

                On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
                <tzuli...@apache.org <mailto:tzuli...@apache.org>>
                wrote:

                    Hi Juho,

                    Now that I think of it this seems like a bug to
                    me: why does the call to getSideOutput succeed
                    if it doesn't provide _any_ input?

                    With the way side outputs work, I don’t think
                    this is possible (or would make sense). An
                    operator does not know whether or not it would
                    ever emit some element with a given tag.
                    As far as I understand it, calling
                    `getSideOutput` essentially adds a virtual node
                    to the result stream graph that listens to the
                    specified tag from the upstream input.

                    While I’m not aware whether or not your
                    observation is expected behavior, from an API
                    perspective, I can see why it is a bit confusing
                    for you.
                    Aljoscha would be the expert here, maybe he’ll
                    have more insights. I’ve looped him in cc’ed.

                    Cheers,
                    Gordon


                    On 12 January 2018 at 4:05:13 PM, Juho Autio
                    (juho.au...@rovio.com
                    <mailto:juho.au...@rovio.com>) wrote:

                    When I run the code below (Flink 1.4.0 or
                    1.3.1), only "a" is printed. If I switch the
                    position of .process() & .filter() (ie. filter
                    first, then process), both "a" & "b" are
                    printed, as expected.

                    I guess it's a bit hard to say what the side
                    output should include in this case: the stream
                    before filtering or after it?

                    What I would suggest is Flink to protect
                    against this kind of a user error that is hard
                    to debug. Would it be possible that Flink
                    throws an exception if one tries to call
                    .getSideOutput() on anything that doesn't
                    actually provide that side output? Now that I
                    think of it this seems like a bug to me: why
                    does the call to getSideOutput succeed if it
                    doesn't provide _any_ input? I would expect it
                    to get the side output data stream _before_
                    applying .filter().

                    import
                    org.apache.flink.api.common.functions.FilterFunction;
                    import
                    org.apache.flink.streaming.api.datastream.DataStreamSource;
                    import
                    
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
                    import
                    
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                    import
                    org.apache.flink.streaming.api.functions.ProcessFunction;
                    import org.apache.flink.util.Collector;
                    import org.apache.flink.util.OutputTag;

                    public class SideOutputProblem {

                    public static void main(String[] args) throws
                    Exception {

                    StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
                    DataStreamSource<String> stream =
                    env.fromElements("a", "b");
                    OutputTag<String> sideOutputTag = new
                    OutputTag<String>("side-output"){};

                    SingleOutputStreamOperator<String> processed =
                    stream

                    .process(new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String s, Context
                    context, Collector<String> collector) throws
                    Exception {
                    if ("a".equals(s)) {
                    collector.collect(s);
                    } else {
                    context.output(sideOutputTag, s);
                    }
                              }
                          })

                    .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String s) throws Exception {
                    return true;
                              }
                          });

                    processed.getSideOutput(sideOutputTag).printToErr();

                    processed.print();

                    env.execute();
                        }

                    }

                    Cheers,
                    Juho







Reply via email to