Re: Dropping messages based on timestamp.

2020-05-29 Thread Arvid Heise
Although flatMap() is a valid choice, it would be more idiomatic to use
filter(). I'd apply that even before running TimestampAssigner, except when
extracting the timestamp is rather complicated. But if it's a simple field,
then it feels better to first filter bad data, and then apply any kind of
logic.

On Fri, May 29, 2020 at 1:13 PM Robert Metzger  wrote:

> Hi Joe,
>
> my gut feeling is that a flatMap() is what you are looking for.
>
> Best,
> Robert
>
> On Thu, May 28, 2020 at 7:21 PM Joe Malt  wrote:
>
>> Hi,
>>
>> I'm working on a custom TimestampAssigner which will do different things
>> depending on the value of the extracted timestamp. One of the actions I
>> want to take is to drop messages entirely if their timestamp meets certain
>> criteria.
>>
>> Of course there's no direct way to do this in the TimestampAssigner, but
>> I'd like to keep this logic as close to the TimestampAssigner as possible
>> since this is going to be a pluggable component used in a bunch of
>> different Flink apps.
>>
>> What would be the best way to implement this?
>>
>> Thanks,
>> Joe
>>
>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Dropping messages based on timestamp.

2020-05-29 Thread Robert Metzger
Hi Joe,

my gut feeling is that a flatMap() is what you are looking for.

Best,
Robert

On Thu, May 28, 2020 at 7:21 PM Joe Malt  wrote:

> Hi,
>
> I'm working on a custom TimestampAssigner which will do different things
> depending on the value of the extracted timestamp. One of the actions I
> want to take is to drop messages entirely if their timestamp meets certain
> criteria.
>
> Of course there's no direct way to do this in the TimestampAssigner, but
> I'd like to keep this logic as close to the TimestampAssigner as possible
> since this is going to be a pluggable component used in a bunch of
> different Flink apps.
>
> What would be the best way to implement this?
>
> Thanks,
> Joe
>
>
>