Re: Multiple stream operator watermark handling

2018-05-25 Thread Piotr Nowojski
Great to hear that this worked out for you :)

Progression of watermarks on an empty stream is a known issue, that we are 
working on to resolve in the future. Usually recommended workarounds are to 
send a custom blank event (which should be ignored) once a while.

I have expanded the documentation:
https://github.com/apache/flink/pull/6076 

Please check it and If you have any further suggestions you are welcome to make 
a comments in the PR. I hope it clarifies the behaviour.

Piotrek

> On 25 May 2018, at 00:03, Elias Levy  wrote:
> 
> On Thu, May 24, 2018 at 9:20 AM, Elias Levy  > wrote:
> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski  > wrote:
> From top of my head I can imagine two solutions:
> 
> 1. Override the default behaviour of the operator via for example 
> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
> 
> That seems the safer, but more complicated path.
> 
> As we had already implemented the business logic in a RichCoFlatMapFunction, 
> I ended up extending CoStreamFlatMap:
> 
> class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: 
> CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {
> 
>   // Pass through the watermarks from the first stream
>   override def processWatermark1(mark: Watermark): Unit = 
> processWatermark(mark)
> 
>   // Ignore watermarks from the second stream
>   override def processWatermark2(mark: Watermark): Unit = {}
> }
> 
> 
> Then it was easy to replace:
> 
> stream1
>   .connect(stream2)
>   .flatMap( new BusinessCoFlatMapFunction(params) )
> .name("Operator")
> .uid("op")
> 
> with:
> 
> stream1
>   .connect(stream2)
>   .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new 
> BusinessCoFlatMapFunction(params)))
>   .uid("op")
> 
> 



Re: Multiple stream operator watermark handling

2018-05-24 Thread Elias Levy
On Thu, May 24, 2018 at 9:20 AM, Elias Levy 
wrote:

> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski 
> wrote:
>
>> From top of my head I can imagine two solutions:
>>
>> 1. Override the default behaviour of the operator via for example
>> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
>>
>
> That seems the safer, but more complicated path.
>

As we had already implemented the business logic in
a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap:

class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper:
CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {

  // Pass through the watermarks from the first stream
  override def processWatermark1(mark: Watermark): Unit =
processWatermark(mark)

  // Ignore watermarks from the second stream
  override def processWatermark2(mark: Watermark): Unit = {}
}


Then it was easy to replace:

stream1
  .connect(stream2)
  .flatMap( new BusinessCoFlatMapFunction(params) )
.name("Operator")
.uid("op")

with:

stream1
  .connect(stream2)
  .transform("Operator", new
SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params)))
  .uid("op")


Re: Multiple stream operator watermark handling

2018-05-24 Thread Elias Levy
On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski 
wrote:

> From top of my head I can imagine two solutions:
>
> 1. Override the default behaviour of the operator via for example
> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
>

That seems the safer, but more complicated path.


2. Can you set control stream’s watermark to Watermark#MAX_WATERMARK or
> maybe Watermark#MAX_WATERMARK - 1 ?
>

That seems simpler, put potentially perilous if at some point in the future
there was some use to control stream watermarks.  Also, would it work if
there are no messages in the control stream?  Wouldn't that mean no
watermark would be emitted, even if they were hardcoded to Long.MAX_VALUE?
In which case, the operator default for the stream would be used, which
would still be Long.MIN_VALUE.


BTW, this reminds me of an issue I've mentioned previously, the
documentation is lacking on a description of how watermarks are processed
by operators.  E.g. when does a window emit watermarks?  what watermarks
does it emit?  That seems like a rather large omission, as one of the main
features of Flink is event time processing, which puts watermarks almost on
equal footing to data and data operations.  Just as the docs describe how
data is process, merged, etc, the same should be true for watermarks.


Re: Multiple stream operator watermark handling

2018-05-24 Thread Piotr Nowojski
Hi,

From top of my head I can imagine two solutions:

1. Override the default behaviour of the operator via for example 
org.apache.flink.streaming.api.datastream.ConnectedStreams#transform

2. Can you set control stream’s watermark to Watermark#MAX_WATERMARK or maybe 
Watermark#MAX_WATERMARK - 1 ?

Piotrek

> On 24 May 2018, at 16:07, Elias Levy  wrote:
> 
> Is there mechanism for a multiple stream operator to ignore watermarks from 
> one of the streams?
> 
> The use case is a multiple stream operator that consumes a primary stream and 
> a secondary control stream.  The control stream may only receive messages in 
> rare occasion, and possibly never.  The default behavior of the operator is 
> to only emit the lowest of the last watermark received from each input 
> stream.  That means that event time fails to advance if there are no control 
> messages.  
> 
> I also notice that FLIP-17, the Side Input proposal, does not address this 
> issue, either in the Wiki or in the Google Docs.
> 
> Assuming there is no currently prescribed way to handle this, are folks 
> taking care of this by introducing a new Assigner after the multiple input 
> operator to generate watermarks?
> 
> 



Multiple stream operator watermark handling

2018-05-24 Thread Elias Levy
Is there mechanism for a multiple stream operator to ignore watermarks from
one of the streams?

The use case is a multiple stream operator that consumes a primary stream
and a secondary control stream.  The control stream may only receive
messages in rare occasion, and possibly never.  The default behavior of the
operator is to only emit the lowest of the last watermark received from
each input stream.  That means that event time fails to advance if there
are no control messages.

I also notice that FLIP-17, the Side Input proposal, does not address this
issue, either in the Wiki or in the Google Docs.

Assuming there is no currently prescribed way to handle this, are folks
taking care of this by introducing a new Assigner after the multiple input
operator to generate watermarks?