Thanks for the suggestion, Yun!

On Sun, May 31, 2020 at 11:15 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Yu,
>
> I think when the serializer returns *null, *the following operator should
> still receive a record of null. A possible thought is that the following
> operator may couting the number of null records received and use a metric
> to publish the value to a monitor system, and the monitor system promethus,
> and the monitor system should be able to configure alert conditions.
>
> If *null* has problems, a special indicating object instance may be
> created like NULL_TBASE, and the operator should be able to count the
> number of NULL_TBASE received.
>
> Best,
>  Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Yu Yang <yuyan...@gmail.com>
> *Send Date:*Mon Jun 1 06:37:35 2020
> *Recipients:*user <user@flink.apache.org>
> *Subject:*best practice for handling corrupted records / exceptions in
> custom DefaultKryoSerializer?
>
>> Hi all,
>>
>> To deal with corrupted messages that can leak into the data source once
>> in a while, we implement a custom DefaultKryoSerializer class as below that
>> catches exceptions. The custom serializer returns null in read(...) method
>> when it encounters exception in reading. With this implementation, the
>> serializer may silently drop records.  One concern is that it may drop too
>> many records before we notice and take actions. What is the best practice
>> to handle this?
>>
>> The serializer processes one record at a time. Will reading a corrupted
>> record make the serialize fail to process the next valid record?
>>
>> public class CustomTBaseSerializer extends TBaseSerializer {
>>      private static final Logger LOG = LoggerFactory.getLogger
>> (CustomTBaseSerializer.class);
>>      @Override
>>      public void write(Kryo kryo, Output output, TBase tBase) {
>>          try {
>>              super.write(kryo, output, tBase);
>>         } catch (Throwable t) {
>>              LOG.error("Failed to write due to unexpected Throwable", t);
>>         }
>>     }
>>
>>      @Override
>>      public TBase read(Kryo kryo, Input input, Class<TBase> tBaseClass) {
>>          try {
>>              return super.read(kryo, input, tBaseClass);
>>         } catch (Throwable t) {
>>              LOG.error("Failed to read from input due to unexpected
>> Throwable", t);
>>              return null;
>>         }
>>      }
>>   }
>>
>> Thank you!
>>
>> Regards,
>> -Yu
>>
>

Reply via email to