[ 
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939952#comment-16939952
 ] 

Sergiu Pantiru commented on KAFKA-4740:
---------------------------------------

Hello,

I was using the Avro deserializer and when an "unknown" message came through 
the consumer went into an infinite loop and filled a bunch of log files with 
deserialization exceptions. 

I came upon this bug and still did not see any solution as the PR is not yet 
merged. (except the workaround with having a silent deserializer )

If it's possible I would like to ask why is the same message retried if the 
deserialization failed the first time. My assumption would be that it fails the 
second time and so on.

For me it would have made more sense if the message was released/pulled again 
until max retries would be reached. That way the client can configure what 
happens with the messages that failed.

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1522]
{code:java}
try {
                for (int i = 0; i < maxRecords; i++) {
                    // Only move to next record if there was no exception in 
the last fetch. Otherwise we should
                    // use the last record to do deserialization again.
                    if (cachedRecordException == null) {
                        corruptLastRecord = true;
                        lastRecord = nextFetchedRecord();
                        corruptLastRecord = false;
                    }
                    if (lastRecord == null)
                        break;
                    records.add(parseRecord(partition, currentBatch, 
lastRecord));
                    recordsRead++;
                    bytesRead += lastRecord.sizeInBytes();
                    nextFetchOffset = lastRecord.offset() + 1;
                    // In some cases, the deserialization may have thrown an 
exception and the retry may succeed,
                    // we allow user to move forward in this case.
                    cachedRecordException = null;
                }
            } catch (SerializationException se) {
                cachedRecordException = se;
                if (records.isEmpty())
                    throw se;
            } catch (KafkaException e) {
                cachedRecordException = e;
                if (records.isEmpty())
                    throw new KafkaException("Received exception when fetching 
the next record from " + partition
                                                 + ". If needed, please seek 
past the record to "
                                                 + "continue consumption.", e);
            }
{code}
Thanks,
 Sergiu

> Using new consumer API with a Deserializer that throws SerializationException 
> can lead to infinite loop
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4740
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4740
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>         Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>            Reporter: Sébastien Launay
>            Assignee: Sébastien Launay
>            Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws 
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and 
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such 
> the exception is swallowed by the {{NetworkClient}} class and result in an 
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next 
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most 
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer<String, Integer> kafkaConsumer = new 
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
> IntegerDeserializer())) {
>     kafkaConsumer.subscribe(Arrays.asList("topic"));
>     // Will run till the shutdown hook is called
>     while (!doStop) {
>         try {
>             ConsumerRecords<String, Integer> records = 
> kafkaConsumer.poll(1000);
>             if (!records.isEmpty()) {
>                 logger.info("Got {} messages", records.count());
>                 for (ConsumerRecord<String, Integer> record : records) {
>                     logger.info("Message with partition: {}, offset: {}, key: 
> {}, value: {}",
>                     record.partition(), record.offset(), record.key(), 
> record.value());
>                 }
>             } else {
>                     logger.info("No messages to consume");
>             }
>         } catch (SerializationException e) {
>             logger.warn("Failed polling some records", e);
>         }
>      }
> }
> {code}
> when run with the following records (third record has an invalid Integer 
> value):
> {noformat}
>     printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
>     printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
>     printf "\x00\x00\x00\n"     | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
>     printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
> value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
> value: 1
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> ...
> {noformat}
> I don't believe committing offsets would help and even if it did this could 
> potentially result in a few well formed records not being consumed from that 
> {{ConsumerRecords}} batch (data loss).
> I have only seen a few mentions of this bug online \[3\] but I believe this 
> is a critical issue as the new consumer API is not in beta anymore yet if you 
> do not control producers (that can inject malformed values) or you use some 
> advanced deserializer that throws such exception (e.g. schema-registry 
> {{KafkaAvroDeserializer}}) then you can end up blocking a consumer from 
> advancing in the stream.
> Current workarounds:
> - use a {{Deserializer}} that do not throw a {{SerializationException}} (e.g. 
>  {{ByteArrayDeserializer}}, {{StringDeserializer}})
> - wrap the {{Deserializer}} to catch and log the {{SerializationException}} 
> but return {{null}} and then check for {{null}} in the client code (that's 
> what we use on top of {{KafkaAvroDeserializer}} in case there is an issue 
> reaching the schema registry or the Avro datum is either invalid or not 
> compatible with the reader's schema for some reason)
> Potential solutions:
> # continue to throw {{SerializationException}} when calling 
> {{Consumer#poll(long)}} but skip that malformed record on next 
> {{Consumer#poll(long)}}
> # do not throw {{SerializationException}} when calling 
> {{Consumer#poll(long)}} but expose information about invalid records in 
> {{ConsumerRecords}}
> # do not throw {{SerializationException}} when calling 
> {{Consumer#poll(long)}} but store the exception(s) in the {{ConsumerRecord}} 
> object record so that it is rethrown on  {{ConsumerRecord#key()}} and 
> {{ConsumerRecord#value()}}
> # do not deserialize records during {{Consumer#poll()}} but do it when 
> calling {{ConsumerRecord#key()}} and {{ConsumerRecord#value()}} (similar to 
> the old consumer)
> I believe any of those solutions breaks compatibility semantic wise but not 
> necessary binary compatibility as the {{SerializationException}} is a 
> {{RuntimeException}} so it could be "moved around".
> My preference goes to the two last ones and I would be happy to contribute 
> such a change as well as update the documentation on 
> {{SerializationException}} to reflect that it is not only used for 
> serializing records.
> \[1\] 
> https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/message/MessageAndMetadata.scala
> \[1\] 
> http://docs.confluent.io/2.0.0/schema-registry/docs/serializer-formatter.html#serializer
> \[2\] 
> https://github.com/slaunay/kafka-consumer-serialization-exception-example
> \[3\] https://groups.google.com/forum/#!topic/kafka-clients/KBSPmY69H94



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to