On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fearsome.lucid...@gmail.com> wrote:
> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> From top of my head I can imagine two solutions: >> >> 1. Override the default behaviour of the operator via for example >> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform >> > > That seems the safer, but more complicated path. > As we had already implemented the business logic in a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap: class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper) { // Pass through the watermarks from the first stream override def processWatermark1(mark: Watermark): Unit = processWatermark(mark) // Ignore watermarks from the second stream override def processWatermark2(mark: Watermark): Unit = {} } Then it was easy to replace: stream1 .connect(stream2) .flatMap( new BusinessCoFlatMapFunction(params) ) .name("Operator") .uid("op") with: stream1 .connect(stream2) .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params))) .uid("op")