[jira] [Resolved] (KAFKA-6396) Possibly kafka-connect converter should be able to stop processing chain
[ https://issues.apache.org/jira/browse/KAFKA-6396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Koval resolved KAFKA-6396. Resolution: Invalid Thank you for the explanation. > Possibly kafka-connect converter should be able to stop processing chain > > > Key: KAFKA-6396 > URL: https://issues.apache.org/jira/browse/KAFKA-6396 > Project: Kafka > Issue Type: Wish > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Alexander Koval >Priority: Minor > > At present only transformations can discard records returning null. But I > think sometimes it would be nice to discard processing chain after converting > message. For example I have some tags shipped with a message key and I want > to stop processing the message after converting its key (there are a lot of > messages and I don't want to deserialize message values that I don't need). > At the moment to do that I should disable converters and move message > deserializing to the transformation chain: > {code} > key.converter=org.apache.kafka.connect.converters.ByteArrayConverter > value.converter=org.apache.kafka.connect.converters.ByteArrayConverter > transforms=proto,catalog > transforms.proto.type=company.evo.kafka.ProtobufTransformation > transforms.proto.key.protobuf.class=company.evo.uaprom.indexator.KeyProto$KeyMessage > transforms.proto.value.protobuf.class=company.evo.uaprom.indexator.catalog.CompanyProto$UniversalCompanyMessage > transforms.proto.tag=catalog > {code} > If > [WorkerSinkTask|https://github.com/apache/kafka/blob/1.0.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L453] > checked converted values on {{null}} it would solved my problem more > gracefully -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6466) Kafka connect task sometimes fails on start-up
Alexander Koval created KAFKA-6466: -- Summary: Kafka connect task sometimes fails on start-up Key: KAFKA-6466 URL: https://issues.apache.org/jira/browse/KAFKA-6466 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Alexander Koval We use kafka connect for indexing into Elasticsearch. Sometimes when we updating our kafka-connect application one or several tasks get {{FAILED}} status. Restarting connector with failed task usually helps. Below is an example of the failed task's stacktrace: {code:java} java.lang.IllegalArgumentException: A metric named 'MetricName [name=offset-commit-max-time-ms, group=connector-task-metrics, description=The maximum time in milliseconds taken by this task to commit offsets., tags={connector=prom-ua-catalog-product, task=2}]' already exists, can't register another one. at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241) at org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328) at org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69) at org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98) at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449) at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6407) Sink task metrics are the same for all connectors
Alexander Koval created KAFKA-6407: -- Summary: Sink task metrics are the same for all connectors Key: KAFKA-6407 URL: https://issues.apache.org/jira/browse/KAFKA-6407 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Alexander Koval Priority: Minor I have a lot of sink connectors inside a distributed worker. When I tried to save metrics to graphite I discovered all task metrics are identical. {code} $>get -b kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0 sink-record-read-total #mbean = kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0: sink-record-read-total = 228744.0; $>get -b kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0 sink-record-read-total #mbean = kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0: sink-record-read-total = 228744.0; $>get -b kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0 sink-record-read-total #mbean = kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0: sink-record-read-total = 228744.0; {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6396) Possibly kafka-connect converter should be able to stop processing chain
Alexander Koval created KAFKA-6396: -- Summary: Possibly kafka-connect converter should be able to stop processing chain Key: KAFKA-6396 URL: https://issues.apache.org/jira/browse/KAFKA-6396 Project: Kafka Issue Type: Wish Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Alexander Koval Priority: Minor At present only transformations can discard records returning null. But I think sometimes it would be nice to discard processing chain after converting message. For example I have some tags shipped with a message key and I want to stop processing the message after converting its key (there are a lot of messages and I don't want to deserialize message values that I don't need). At the moment to do that I should disable converters and move message deserializing to the transformation chain: {code} key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter transforms=proto,catalog transforms.proto.type=company.evo.kafka.ProtobufTransformation transforms.proto.key.protobuf.class=company.evo.uaprom.indexator.KeyProto$KeyMessage transforms.proto.value.protobuf.class=company.evo.uaprom.indexator.catalog.CompanyProto$UniversalCompanyMessage transforms.proto.tag=catalog {code} If [WorkerSinkTask|https://github.com/apache/kafka/blob/1.0.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L453] checked converted values on {{null}} it would solved my problem more gracefully -- This message was sent by Atlassian JIRA (v6.4.14#64029)