Hey Niels, This minimal Flink job executes in Flink 1.10:
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> input = env.addSource(new StringSourceFunction()); List<String> result = new ArrayList<>(5); DataStreamUtils.collect(input).forEachRemaining(result::add); env.execute("Flink Streaming Java API Skeleton"); } Maybe the TestUserAgentAnalysisMapperInline class is doing some magic that breaks with the StreamGraphGenerator? Best, Robert On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <ni...@basjes.nl> wrote: > Hi Gordon, > > Thanks. This works for me. > > I find it strange that when I do this it works (I made the differences > bold) > > List<TestRecord> result = new ArrayList<>(5); > > DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); > > *resultDataStream.print();* > > environment.execute(); > > > how ever this does not work > > List<TestRecord> result = new ArrayList<>(5); > > DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); > > environment.execute(); > > > and this also does not work > > *resultDataStream.print();* > > List<TestRecord> result = new ArrayList<>(5); > > DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); > > environment.execute(); > > > In both these cases it fails with > > > java.lang.IllegalStateException: *No operators defined in streaming > topology. Cannot execute.* > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602) > at > nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144) > > > > Did I do something wrong? > Is this a bug in the DataStreamUtils ? > > Niels Basjes > > > > On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <tzuli...@gmail.com> wrote: > >> Hi, >> >> To collect the elements of a DataStream (usually only meant for testing >> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`. >> >> Cheers, >> Gordon >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >