Hi,

These are valid concerns. And yes, AFAIK users have been writing to logs within 
the deserialization schema to track this. The connectors as of now have no 
logging themselves in case of a skipped record.

I think we can implement both logging and metrics to track this, most of which 
you have already brought up.
For logging, the information should contain topic, partition, and offset for 
debugging.
For metrics, we should be able to use the user variable functionality to have 
skip counters that can be grouped by topic / partition / offset.

Though, I’m not sure how helpful this would be in practice.
I’ve opened a JIRA for this issue for further discussion: 
https://issues.apache.org/jira/browse/FLINK-9204

Cheers,
Gordon
On 16 April 2018 at 7:43:00 PM, Fabian Hueske (fhue...@gmail.com) wrote:

Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register 
metrics. Each source would need to check for the interface and call it to setup 
metrics.
2) Check for null returns in the source functions and increment a respective 
counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset are 
important debugging information. However, I don't think that metrics would be 
good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support such 
error tracking.
Adding Gordon to the thread who knows the internals of the connectors.

Best, Fabian

2018-04-08 17:53 GMT+02:00 Alexander Smirnov <alexander.smirn...@gmail.com>:
I have the same question. In case of kafka source, it would be good to know 
topic name and offset of the corrupted message for further investigation.
Looks like the only option is to write messages into a log file

On Fri, Apr 6, 2018 at 9:12 PM Elias Levy <fearsome.lucid...@gmail.com> wrote:
I was wondering how are folks tracking deserialization errors.  The 
AbstractDeserializationSchema interface provides no mechanism for the 
deserializer to instantiate a metric counter, and "deserialize" must return a 
null instead of raising an exception in case of error if you want your job to 
continue functioning during a deserialization error.  But that means such 
errors are invisible.

Thoughts?

Reply via email to