I tried this in Flink 1.10.0 :

    @Test
    public void experimentalTest() throws Exception {
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> input = env.fromElements("One", "Two");
//        DataStream<String> input = env.addSource(new StringSourceFunction());
        List<String> result = new ArrayList<>(5);
        DataStreamUtils.collect(input).forEachRemaining(result::add);
        env.execute("Flink Streaming Java API Skeleton");
    }


Results in


java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot execute.

        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
        at 
nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

...



On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger <rmetz...@apache.org> wrote:

> Hey Niels,
>
> This minimal Flink job executes in Flink 1.10:
>
> public static void main(String[] args) throws Exception {
>    final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>    DataStream<String> input = env.addSource(new StringSourceFunction());
>    List<String> result = new ArrayList<>(5);
>    DataStreamUtils.collect(input).forEachRemaining(result::add);
>    env.execute("Flink Streaming Java API Skeleton");
> }
>
> Maybe the TestUserAgentAnalysisMapperInline class is doing some magic
> that breaks with the StreamGraphGenerator?
>
> Best,
> Robert
>
> On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <ni...@basjes.nl> wrote:
>
>> Hi Gordon,
>>
>> Thanks. This works for me.
>>
>> I find it strange that when I do this it works (I made the differences
>> bold)
>>
>> List<TestRecord> result = new ArrayList<>(5);
>>
>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>
>> *resultDataStream.print();*
>>
>> environment.execute();
>>
>>
>> how ever this does not work
>>
>> List<TestRecord> result = new ArrayList<>(5);
>>
>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>
>> environment.execute();
>>
>>
>> and this also does not work
>>
>> *resultDataStream.print();*
>>
>> List<TestRecord> result = new ArrayList<>(5);
>>
>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>
>> environment.execute();
>>
>>
>> In both these cases it fails with
>>
>>
>> java.lang.IllegalStateException: *No operators defined in streaming
>> topology. Cannot execute.*
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
>> at
>> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)
>>
>>
>>
>> Did I do something wrong?
>> Is this a bug in the DataStreamUtils ?
>>
>> Niels Basjes
>>
>>
>>
>> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <tzuli...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> To collect the elements of a DataStream (usually only meant for testing
>>> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to