Hi Niclas, I'd either add a Filter to directly discard bad records. That should make the behavior explicit. If you need to do complex transformations that you don't want to do twice, the FlatMap approach would be the most efficient. If you'd like to keep the bad records, you can implement a ProcessFunction and add a side output  that collects bad records.
Hope this helps, Fabian  https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/side_output.html 2018-02-19 10:29 GMT+01:00 Niclas Hedhman <nic...@apache.org>: > Hi again, > > something that I don't find (easily) in the documentation is what the > recommended method is to discard data from the stream. > > On one hand, I could always use flatMap(), even if it is "per message" > since that allows me to return zero or one objects. > > DataStream<MyType> stream = > env.addSource( source ) > .flatMap( new MyFunction() ) > > > But that seems a bit misleading, as the casual observer will get the idea > that MyFunction 'branches' out, but it doesn't. > > The other "obvious" choice is to return null and follow with a filter... > > DataStream<MyType> stream = > env.addSource( source ) > .map( new MyFunction() ) > .filter( Objects::nonNull ) > > BUT, that doesn't work with Java 8 method references like above, so I have > to create my own filter to get the type information correct to Flink; > > DataStream<MyType> stream = > env.addSource( source ) > .map( new MyFunction() ) > .filter( new DiscardNullFilter<>() ) > > > And in my opinion, that ends up looking ugly as the streams/pipeline (not > used to terminology yet) quickly have many transformations and branches, > and having a null check after each seems to put the burden of knowledge in > the wrong spot ("Can this function return null?") > > Throwing an exception is shutting down the entire stream, which seems > overly aggressive for many data related discards. > > Any other choices? > > Cheers > -- > Niclas Hedhman, Software Developer > http://zest.apache.org - New Energy for Java >