Ah ok thanks! On Sat, 12 Oct 2019 at 11:13, Zhu Zhu <reed...@gmail.com> wrote:
> I mean the Kafka source provided in Flink can correctly ignores null > deserialized values. > > isEndOfStream allows you to control when to end the input stream. > If it is used for running infinite stream jobs, you can simply return > false. > > Thanks, > Zhu Zhu > > John Smith <java.dev....@gmail.com> 于2019年10月12日周六 下午8:40写道: > >> The Kafka Fetcher you mean the flink JSON schemas? They throw >> IOExceptions? >> >> Also what's the purpose of isEndOfStream most schemas I looked at don't >> do anything but just return false? >> >> On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu, <reed...@gmail.com> wrote: >> >>> Hi John, >>> >>> It should work with a *null* return value. >>> In the java doc of DeserializationSchema#deserialize it says that >>> >>>> *@return The deserialized message as an object (null if the message >>>> cannot be deserialized).* >>> >>> >>> I also checked the Kafka fetcher in Flink and it can correctly handle a >>> null deserialized record. >>> >>> Just pay attention to also not make >>> *DeserializationSchema#isEndOfStream* throw errors on a null record >>> provided. >>> >>> Thanks, >>> Zhu Zhu >>> >>> John Smith <java.dev....@gmail.com> 于2019年10月12日周六 上午5:36写道: >>> >>>> Hi using Flink 1.8.0. >>>> >>>> I am ingesting data from Kafka, unfortunately for the time being I have >>>> not looked into using the schema registry. >>>> >>>> So for now I would like to write a simple deserialization schema that >>>> discards the data if deserialization fails. >>>> >>>> The other option is to do in flat map with markers and split to dead >>>> letter queue, but I'm not too concerned about that for now. >>>> >>>> Is it ok to just return null if deserialization fails? >>>> >>>> @Override >>>> public MyObject deserialize(byte[] message) { >>>> try { >>>> return MyDecoder.decode(message)); >>>> } catch (IOException ex) { >>>> logger.warn("Failed to decode message.", ex); >>>> return null; >>>> } >>>> } >>>> >>>>