I've filed a JIRA to improve the error message: https://issues.apache.org/jira/browse/FLINK-3918
On Fri, Apr 22, 2016 at 11:17 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Konstantin, > > this exception is thrown if you do not set the time characteristic to > event time and assign timestamps. > Please try to add > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > after you obtained the StreamExecutionEnvironment. > > Best, Fabian > > 2016-04-22 15:47 GMT+02:00 Konstantin Kulagin <kkula...@gmail.com>: > >> Hi guys, >> >> trying to run this example: >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStreamSource<Tuple2<Long, String>> source = env.addSource(new >> SourceFunction<Tuple2<Long, String>>() { >> @Override >> public void run(SourceContext<Tuple2<Long, String>> ctx) throws >> Exception { >> LongStream.range(0, 33).forEach(l -> { >> ctx.collect(Tuple2.of(0L, "This is " + l)); >> }); >> } >> >> @Override >> public void cancel() { >> } >> }); >> >> >> source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()). >> // source. >> >> keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))). >> >> apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, >> GlobalWindow>() { >> @Override >> public void apply(Tuple tuple, GlobalWindow window, >> Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception { >> System.out.println("!!!!!!!!! " + Joiner.on(",").join(input)); >> } >> }); >> >> env.execute("yoyoyo"); >> >> Getting Caused by: java.lang.ClassCastException: >> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to >> org.apache.flink.streaming.runtime.streamrecord.StreamRecord >> at >> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42) >> at >> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) >> at >> org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109) >> at >> org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95) >> at >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90) >> >> >> - After googling I've found this: >> https://issues.apache.org/jira/browse/FLINK-3688 >> >> - went to github, downloaded branch 1.0.2 which contains specified change >> but having the same results. >> >> What am I missing here? >> >> Thanks! >> >> Konstantin >> >> >> >