Hi Nico,
Thanks a lot. I did consider that, but I've missed the clarification of the contract brought by the piece a doc you pointed: "returning
null
to allow the Flink Kafka consumer to silently skip the corrupted message".I suppose it could be an improvement for JSONKeyValueDeserializationSchema to provide this behaviour as an out-of-the-box option. But anyway, I do have a solution in hands.
Thanks again.
Adrian
----- Original message -----
From: Nico Kruber <n...@data-artisans.com>
To: Adrian Vasiliu <vasi...@fr.ibm.com>, user@flink.apache.org
Cc:
Subject: Re: Unrecoverable job failure after Json parse error?
Date: Tue, Jan 16, 2018 11:34 AM
Hi Adrian,
couldn't you solve this by providing your own DeserializationSchema [1],
possibly extending from JSONKeyValueDeserializationSchema and catching
the error there?
Nico
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema
On 12/01/18 18:26, Adrian Vasiliu wrote:
> Hello,
>
> When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema,
> if an invalid, non-parsable message is sent to the Kafka topic, the
> consumer expectedly fails with JsonParseException. So far so good, but
> this leads to the following loop: the job switches to FAILED
> then attempts to restart and fails again, and so on. That is, the
> parsing error leads to the Kafka message not being committed, hence it
> keeps being received.
> Since the JsonParseException can't be catched in application code, what
> would be the recommended way to handle the case of possibly
> non-parseable Kafka messages?
>
> Is there is a way to configure the Flink Kafka consumer to treat the
> case of non-parseable messages by logging the parsing error then
> committing the message such that the processing can continue? Is there
> isn't, would such an enhancement make sense?
>
> Unless there is a better solution, it looks as a requirement to
> guarantee that FlinkKafkaConsumer011 only receives valid messages, which
> can be annoying in practice.
>
> For reference, here's the stack of the JsonParseException in the log:
>
> Source: Custom Source(1/1) switched to FAILED
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> Unexpected character (':' (code 58)): Expected space separating
> root-level values
> at [Source: UNKNOWN; line: 1, column: 3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.
>
> Thanks,
> Adrian
> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
> Compagnie IBM France
> Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
> RCS Nanterre 552 118 465
> Forme Sociale : S.A.S.
> Capital Social : 657.364.587 €
> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A
Compagnie IBM France
Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
RCS Nanterre 552 118 465
Forme Sociale : S.A.S.
Capital Social : 657.364.587 €
SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A