On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> From top of my head I can imagine two solutions:
>>
>> 1. Override the default behaviour of the operator via for example
>> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
>>
>
> That seems the safer, but more complicated path.
>

As we had already implemented the business logic in
a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap:

class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper:
CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {

  // Pass through the watermarks from the first stream
  override def processWatermark1(mark: Watermark): Unit =
processWatermark(mark)

  // Ignore watermarks from the second stream
  override def processWatermark2(mark: Watermark): Unit = {}
}


Then it was easy to replace:

stream1
      .connect(stream2)
      .flatMap( new BusinessCoFlatMapFunction(params) )
        .name("Operator")
        .uid("op")

with:

stream1
      .connect(stream2)
      .transform("Operator", new
SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params)))
      .uid("op")

Reply via email to