What I had in mind was about a generic handling of the JsonParseException case. But you are right, the picture becomes fuzzier if we also consider messages that are parseable but invalid due to missing or invalid fields. We could imagine a deeper message validation feature but I think subclassing to implement custom handling is okay...
Thanks
Adrian
 
----- Original message -----
From: Nico Kruber <n...@data-artisans.com>
To: Adrian Vasiliu <vasi...@fr.ibm.com>
Cc: user@flink.apache.org
Subject: Re: Unrecoverable job failure after Json parse error?
Date: Tue, Jan 16, 2018 3:18 PM
 
Nice, I didn't even read that far myself :P
-> turns out the API was prepared for that after all

I'm not sure about a default option for handling/skipping corrupted
messages since the handling of those is probably highly use-case
specific. If you nonetheless feel that this should be in there, feel
free to open an improvement request in our issue tracker at
https://issues.apache.org/jira/browse/FLINK


Nico

On 16/01/18 13:35, Adrian Vasiliu wrote:
> Hi Nico,
> Thanks a lot. I did consider that, but I've missed the clarification of
> the contract brought by the piece a doc you
> pointed: "returning |null| to allow the Flink Kafka consumer to silently
> skip the corrupted message".
> I suppose it could be an improvement
> for JSONKeyValueDeserializationSchema to provide this behaviour as an
> out-of-the-box option. But anyway, I do have a solution in hands.
> Thanks again.
> Adrian
>  
>
>     ----- Original message -----
>     From: Nico Kruber <n...@data-artisans.com>
>     To: Adrian Vasiliu <vasi...@fr.ibm.com>, user@flink.apache.org
>     Cc:
>     Subject: Re: Unrecoverable job failure after Json parse error?
>     Date: Tue, Jan 16, 2018 11:34 AM
>      
>     Hi Adrian,
>     couldn't you solve this by providing your own DeserializationSchema [1],
>     possibly extending from JSONKeyValueDeserializationSchema and catching
>     the error there?
>
>
>     Nico
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema
>
>     On 12/01/18 18:26, Adrian Vasiliu wrote:
>     > Hello,
>     >
>     > When using FlinkKafkaConsumer011
>     with JSONKeyValueDeserializationSchema,
>     > if an invalid, non-parsable message is sent to the Kafka topic, the
>     > consumer expectedly fails with JsonParseException. So far so good, but
>     > this leads to the following loop: the job switches to FAILED
>     > then attempts to restart and fails again, and so on. That is, the
>     > parsing error leads to the Kafka message not being committed, hence it
>     > keeps being received. 
>     > Since the JsonParseException can't be catched in application code,
>     what
>     > would be the recommended way to handle the case of possibly
>     > non-parseable Kafka messages?
>     >  
>     > Is there is a way to configure the Flink Kafka consumer to treat the
>     > case of non-parseable messages by logging the parsing error then
>     > committing the message such that the processing can continue? Is there
>     > isn't, would such an enhancement make sense?
>     >
>     > Unless there is a better solution, it looks as a requirement to
>     > guarantee that FlinkKafkaConsumer011 only receives valid messages,
>     which
>     > can be annoying in practice.
>     >
>     > For reference, here's the stack of the JsonParseException in the log:
>     >
>     > Source: Custom Source(1/1) switched to FAILED
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
>     > Unexpected character (':' (code 58)): Expected space separating
>     > root-level values
>     > at [Source: UNKNOWN; line: 1, column: 3]
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
>     > at
>     >
>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)
>     > at
>     >
>     org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55)
>     > at
>     >
>     org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40)
>     > at
>     >
>     org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140)
>     > at
>     >
>     org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641)
>     > at
>     >
>     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>     > at
>     >
>     org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>     > at
>     >
>     org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>     > at
>     >
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>     > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     > at java.lang.Thread.run(Thread.java:745)
>     >
>     > My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.
>     >
>     > Thanks,
>     > Adrian
>     > Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
>     > Compagnie IBM France
>     > Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
>     > RCS Nanterre 552 118 465
>     > Forme Sociale : S.A.S.
>     > Capital Social : 657.364.587 €
>     > SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A
>      
>      
>
>  
> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
> Compagnie IBM France
> Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
> RCS Nanterre 552 118 465
> Forme Sociale : S.A.S.
> Capital Social : 657.364.587 €
> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A

 
 
Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
Compagnie IBM France
Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
RCS Nanterre 552 118 465
Forme Sociale : S.A.S.
Capital Social : 657.364.587 €
SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A

Reply via email to