Hey Niels,

This minimal Flink job executes in Flink 1.10:

public static void main(String[] args) throws Exception {
   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
   DataStream<String> input = env.addSource(new StringSourceFunction());
   List<String> result = new ArrayList<>(5);
   DataStreamUtils.collect(input).forEachRemaining(result::add);
   env.execute("Flink Streaming Java API Skeleton");
}

Maybe the TestUserAgentAnalysisMapperInline class is doing some magic that
breaks with the StreamGraphGenerator?

Best,
Robert

On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <ni...@basjes.nl> wrote:

> Hi Gordon,
>
> Thanks. This works for me.
>
> I find it strange that when I do this it works (I made the differences
> bold)
>
> List<TestRecord> result = new ArrayList<>(5);
>
> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>
> *resultDataStream.print();*
>
> environment.execute();
>
>
> how ever this does not work
>
> List<TestRecord> result = new ArrayList<>(5);
>
> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>
> environment.execute();
>
>
> and this also does not work
>
> *resultDataStream.print();*
>
> List<TestRecord> result = new ArrayList<>(5);
>
> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>
> environment.execute();
>
>
> In both these cases it fails with
>
>
> java.lang.IllegalStateException: *No operators defined in streaming
> topology. Cannot execute.*
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
> at
> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)
>
>
>
> Did I do something wrong?
> Is this a bug in the DataStreamUtils ?
>
> Niels Basjes
>
>
>
> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <tzuli...@gmail.com> wrote:
>
>> Hi,
>>
>> To collect the elements of a DataStream (usually only meant for testing
>> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Reply via email to