This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new 1fe0796 KAFKA-8320 : fix retriable exception package for source connectors (#6675) 1fe0796 is described below commit 1fe07967035d523c0f2133d5f9508c3587d0924c Author: Magesh Nandakumar <magesh.n.ku...@gmail.com> AuthorDate: Wed May 15 15:20:20 2019 -0700 KAFKA-8320 : fix retriable exception package for source connectors (#6675) WorkerSourceTask is catching the exception from wrong package org.apache.kafka.common.errors. It is not clear from the API standpoint as to which package the connect framework supports - the one from common or connect. The safest thing would be to support both the packages even though it's less desirable. Author: Magesh Nandakumar <magesh.n.ku...@gmail.com> Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch <rha...@gmail.com> --- .../java/org/apache/kafka/connect/runtime/WorkerSourceTask.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 623a210..14f71a5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -31,6 +30,7 @@ import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; @@ -242,7 +242,7 @@ class WorkerSourceTask extends WorkerTask { protected List<SourceRecord> poll() throws InterruptedException { try { return task.poll(); - } catch (RetriableException e) { + } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); // Do nothing. Let the framework poll whenever it's ready. return null; @@ -340,7 +340,7 @@ class WorkerSourceTask extends WorkerTask { } }); lastSendFailed = false; - } catch (RetriableException e) { + } catch (org.apache.kafka.common.errors.RetriableException e) { log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e); toSend = toSend.subList(processed, toSend.size()); lastSendFailed = true;