I have two streams. One will produce a single record, and the other have a list of records. And I want to do left join. So for example,
Stream A: record1 record2 ... Stream B: single-record After joined, record1, single-record record2, single-record ... However with the following streaming job, it throws an exception 'Record has Long.MIN_VALUE timestamp (= no timestamp marker) ...' even setStreamTimeCharacteristic is configured to ProcessingTime and assignTimestampsAndWatermarks is called. How can I fix this runtime exception? Thanks. object App { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val left = env.fromElements(1, 2, 3, 4, 5).map( new MapFunction[Int, T2[Int, String]] { override def map(value: Int): T2[Int, String] = new T2[Int, String](value, "data 1") } ).assignTimestampsAndWatermarks(new MyTimestampExtractor) val right = env.fromElements(99).map( new MapFunction[Int, T2[Int, String]] { override def map(value: Int): T2[Int, String] = new T2[Int, String](value, "data 2") } ) left.coGroup(right). where { t2 => t2.f0 }. equalTo{ t2=> t2.f0 }. window(TumblingEventTimeWindows.of(Time.seconds(1))). apply(new Join()).print env.execute } } class MyTimestampExtractor extends AssignerWithPeriodicWatermarks[T2[Int, String]] with Serializable { override def extractTimestamp(e: T2[Int, String], prevElementTimestamp: Long) = System.currentTimeMillis override def getCurrentWatermark(): Watermark = new Watermark(System.currentTimeMillis) } class Join extends CoGroupFunction[ T2[Int, String], T2[Int, String], T2[Int, String] ] { val log = LoggerFactory.getLogger(classOf[Join]) override def coGroup(left: java.lang.Iterable[T2[Int, String]], right: java.lang.Iterable[T2[Int, String]], out: Collector[T2[Int, String]]) { var seq = Seq.empty[T2[Int, String]] left.foreach { e => log.info(s"from left: $e"); seq ++= Seq(e) } right.foreach { e => log.info(s"from right: $e"); seq ++= Seq(e) } seq.foreach { e => out.collect(e) } } }