I tried this in Flink 1.10.0 : @Test public void experimentalTest() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> input = env.fromElements("One", "Two"); // DataStream<String> input = env.addSource(new StringSourceFunction()); List<String> result = new ArrayList<>(5); DataStreamUtils.collect(input).forEachRemaining(result::add); env.execute("Flink Streaming Java API Skeleton"); }
Results in java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ... On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger <rmetz...@apache.org> wrote: > Hey Niels, > > This minimal Flink job executes in Flink 1.10: > > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<String> input = env.addSource(new StringSourceFunction()); > List<String> result = new ArrayList<>(5); > DataStreamUtils.collect(input).forEachRemaining(result::add); > env.execute("Flink Streaming Java API Skeleton"); > } > > Maybe the TestUserAgentAnalysisMapperInline class is doing some magic > that breaks with the StreamGraphGenerator? > > Best, > Robert > > On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <ni...@basjes.nl> wrote: > >> Hi Gordon, >> >> Thanks. This works for me. >> >> I find it strange that when I do this it works (I made the differences >> bold) >> >> List<TestRecord> result = new ArrayList<>(5); >> >> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >> >> *resultDataStream.print();* >> >> environment.execute(); >> >> >> how ever this does not work >> >> List<TestRecord> result = new ArrayList<>(5); >> >> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >> >> environment.execute(); >> >> >> and this also does not work >> >> *resultDataStream.print();* >> >> List<TestRecord> result = new ArrayList<>(5); >> >> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >> >> environment.execute(); >> >> >> In both these cases it fails with >> >> >> java.lang.IllegalStateException: *No operators defined in streaming >> topology. Cannot execute.* >> >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602) >> at >> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144) >> >> >> >> Did I do something wrong? >> Is this a bug in the DataStreamUtils ? >> >> Niels Basjes >> >> >> >> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <tzuli...@gmail.com> wrote: >> >>> Hi, >>> >>> To collect the elements of a DataStream (usually only meant for testing >>> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`. >>> >>> Cheers, >>> Gordon >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > -- Best regards / Met vriendelijke groeten, Niels Basjes