Can it be that when you define the ‘right’ steam, you do not specify a timestamp extractor? This is done the same way you do it for the ‘left’ stream.
Kostas > On Jul 8, 2016, at 6:12 AM, David Olsen <davidolsen4...@gmail.com> wrote: > > Changing TimeCharacteristic to EventTime the flink still throws that > runtime exception error. Is > `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` the > correct way to set that feature? > > Thanks. > > java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no > timestamp marker). Is the time characteristic set to 'ProcessingTime', > or did you forget to call > 'DataStream.assignTimestampsAndWatermarks(...)'? > at > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:63) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:223) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > > On 06/07/2016, Kostas Kloudas <k.klou...@data-artisans.com> wrote: >> Hi David, >> >> You are using Tumbling event time windows, but you set the >> timeCharacteristic to processing time. >> If you want processing time, then you should use >> TumblingProcessingTimeWindows and remove the timestampAssigner. >> If you want event time, then you need to set the timeCharacteristic to >> eventTime and leave the rest of your code as is. >> >> Let me know if this answered your question. >> >> Cheers, >> Kostas >> >>> On Jul 6, 2016, at 3:43 PM, David Olsen <davidolsen4...@gmail.com> wrote: >>> >>> I have two streams. One will produce a single record, and the other >>> have a list of records. And I want to do left join. So for example, >>> >>> Stream A: >>> record1 >>> record2 >>> ... >>> >>> Stream B: >>> single-record >>> >>> After joined, >>> >>> record1, single-record >>> record2, single-record >>> ... >>> >>> However with the following streaming job, it throws an exception >>> 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even >>> setStreamTimeCharacteristic is configured to ProcessingTime and >>> assignTimestampsAndWatermarks is called. >>> >>> How can I fix this runtime exception? >>> >>> Thanks. >>> >>> object App { >>> def main(args: Array[String]) { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) >>> val left = env.fromElements(1, 2, 3, 4, 5).map( >>> new MapFunction[Int, T2[Int, String]] { >>> override def map(value: Int): T2[Int, String] = >>> new T2[Int, String](value, "data 1") >>> } >>> ).assignTimestampsAndWatermarks(new MyTimestampExtractor) >>> >>> val right = env.fromElements(99).map( >>> new MapFunction[Int, T2[Int, String]] { >>> override def map(value: Int): T2[Int, String] = >>> new T2[Int, String](value, "data 2") >>> } >>> ) >>> left.coGroup(right). >>> where { t2 => t2.f0 }. >>> equalTo{ t2=> t2.f0 }. >>> window(TumblingEventTimeWindows.of(Time.seconds(1))). >>> apply(new Join()).print >>> env.execute >>> } >>> } >>> >>> class MyTimestampExtractor extends >>> AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable { >>> override def extractTimestamp(e: T2[Int, String], >>> prevElementTimestamp: Long) = >>> System.currentTimeMillis >>> >>> override def getCurrentWatermark(): Watermark = >>> new Watermark(System.currentTimeMillis) >>> } >>> >>> class Join extends CoGroupFunction[ >>> T2[Int, String], T2[Int, String], T2[Int, String] >>> ] { >>> val log = LoggerFactory.getLogger(classOf[Join]) >>> override def coGroup(left: java.lang.Iterable[T2[Int, String]], >>> right: java.lang.Iterable[T2[Int, String]], >>> out: Collector[T2[Int, String]]) { >>> var seq = Seq.empty[T2[Int, String]] >>> left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) } >>> right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) } >>> seq.foreach { e => out.collect(e) } >>> } >>> >>> } >> >>