Daniel Bali created FLINK-1986:
----------------------------------

             Summary: Group by fails on iterative data streams
                 Key: FLINK-1986
                 URL: https://issues.apache.org/jira/browse/FLINK-1986
             Project: Flink
          Issue Type: Bug
          Components: Streaming
            Reporter: Daniel Bali


Hello!

When I try to run a `groupBy` on an IterativeDataStream I get a 
NullPointerException. Here is the code that reproduces the issue:

{code}
public Test() throws Exception {
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();

    DataStream<Tuple2<Long, Long>> edges = env
            .generateSequence(0, 7)
            .map(new MapFunction<Long, Tuple2<Long, Long>>() {
                @Override
                public Tuple2<Long, Long> map(Long v) throws Exception {
                    return new Tuple2<>(v, (v + 1));
                }
            });

    IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate();

    SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1)
            .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                @Override
                public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws 
Exception {
                    return tuple;
                }
            })
            .split(new OutputSelector<Tuple2<Long, Long>>() {
                @Override
                public Iterable<String> select(Tuple2<Long, Long> tuple) {
                    List<String> output = new ArrayList<>();
                    output.add("iterate");
                    return output;
                }
            });

    iteration.closeWith(step.select("iterate"));

    env.execute("Sandbox");
}
{code}

Moving the groupBy before the iteration solves the issue. e.g. this works:

{code}
... iteration = edges.groupBy(1).iterate();
iteration.map(...)
{code}

Here is the stack trace:

{code}
Exception in thread "main" java.lang.NullPointerException
        at 
org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207)
        at 
org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72)
        at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73)
        at org.apache.flink.graph.streaming.example.Test.main(Test.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to