Yu Wang created KAFKA-14266:
-------------------------------

             Summary: MirrorSourceTask will stop mirroring when get the corrupt 
record
                 Key: KAFKA-14266
                 URL: https://issues.apache.org/jira/browse/KAFKA-14266
             Project: Kafka
          Issue Type: Improvement
          Components: KafkaConnect
    Affects Versions: 3.2.3, 2.5.1
            Reporter: Yu Wang


The mirror task will keeping throwing this error when got a corrupt record

 
{code:java}
[2022-09-28 22:27:07,125] WARN Failure during poll. 
(org.apache.kafka.connect.mirror.MirrorSourceTask)
org.apache.kafka.common.KafkaException: Received exception when fetching the 
next record from TOPIC-261. If needed, please seek past the record to continue 
consumption.
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at 
org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Record batch for partition 
TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored 
crc = 4289549294, computed crc = 3792599753)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
        ... 12 more {code}
 

 

In the poll function of {*}MirrorSourceTask{*}, when the task got 
*KafkaException* it only print a warn log and return null.

 
{code:java}
@Override
public List<SourceRecord> poll() {
    if (!consumerAccess.tryAcquire()) {
        return null;
    }
    if (stopping) {
        return null;
    }
    try {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
        List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
        ...
        if (sourceRecords.isEmpty()) {
            // WorkerSourceTasks expects non-zero batch size
            return null;
        } else {
            log.trace("Polled {} records from {}.", sourceRecords.size(), 
records.partitions());
            return sourceRecords;
        }
    } catch (WakeupException e) {
        return null;
    } catch (KafkaException e) {
        log.warn("Failure during poll.", e);
        return null;
    } catch (Throwable e)  {
        log.error("Failure during poll.", e);
        // allow Connect to deal with the exception
        throw e;
    } finally {
        consumerAccess.release();
    }
} {code}
As the consumer will keep throwing exception when it receive a corrupt record. 
This makes the  *MirrorSourceTask* cannot get next records and blocked on the 
same offset.
{code:java}
private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
    // Error when fetching the next record before deserialization.
    if (corruptLastRecord)
        throw new KafkaException("Received exception when fetching the next 
record from " + partition
                                     + ". If needed, please seek past the 
record to "
                                     + "continue consumption.", 
cachedRecordException);

...

} {code}
As this issue will not have any metrics to alert, after the retention time 
reaches, the records after the corrupt record in the source topic will lost and 
cannot be mirrored again.

So it would be better that the mirror source task can throw the exception or 
expose some metrics for users to alert this kind of issue.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to