Maybe I could express it in a slightly different way: if adding the
.filter() after .process() causes the side output to be somehow totally
"lost", then I believe the .getSideOutput() could be aware that there is
not such side output to be listened to from upstream, and throw an
exception. I mean, this should be possible when building the DAG, it
shouldn't require starting the stream to detect? Thanks..

On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Juho,
>
> Now that I think of it this seems like a bug to me: why does the call to
> getSideOutput succeed if it doesn't provide _any_ input?
>
>
> With the way side outputs work, I don’t think this is possible (or would
> make sense). An operator does not know whether or not it would ever emit
> some element with a given tag.
> As far as I understand it, calling `getSideOutput` essentially adds a
> virtual node to the result stream graph that listens to the specified tag
> from the upstream input.
>
> While I’m not aware whether or not your observation is expected behavior,
> from an API perspective, I can see why it is a bit confusing for you.
> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve
> looped him in cc’ed.
>
> Cheers,
> Gordon
>
>
> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.au...@rovio.com) wrote:
>
> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If
> I switch the position of .process() & .filter() (ie. filter first, then
> process), both "a" & "b" are printed, as expected.
>
> I guess it's a bit hard to say what the side output should include in this
> case: the stream before filtering or after it?
>
> What I would suggest is Flink to protect against this kind of a user error
> that is hard to debug. Would it be possible that Flink throws an exception
> if one tries to call .getSideOutput() on anything that doesn't actually
> provide that side output? Now that I think of it this seems like a bug to
> me: why does the call to getSideOutput succeed if it doesn't provide _any_
> input? I would expect it to get the side output data stream _before_
> applying .filter().
>
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.
> SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
>
> public class SideOutputProblem {
>
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>         DataStreamSource<String> stream = env.fromElements("a", "b");
>         OutputTag<String> sideOutputTag = new OutputTag<String>("side-
> output"){};
>
>         SingleOutputStreamOperator<String> processed = stream
>
>                 .process(new ProcessFunction<String, String>() {
>                     @Override
>                     public void processElement(String s, Context context,
> Collector<String> collector) throws Exception {
>                         if ("a".equals(s)) {
>                             collector.collect(s);
>                         } else {
>                             context.output(sideOutputTag, s);
>                         }
>                     }
>                 })
>
>                 .filter(new FilterFunction<String>() {
>                     @Override
>                     public boolean filter(String s) throws Exception {
>                         return true;
>                     }
>                 });
>
>         processed.getSideOutput(sideOutputTag).printToErr();
>
>         processed.print();
>
>         env.execute();
>     }
>
> }
>
> Cheers,
> Juho
>
>

Reply via email to