[ 
https://issues.apache.org/jira/browse/KAFKA-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bert Baron updated KAFKA-13632:
-------------------------------
    Description: 
We have a setup where we filter records with MirrorMaker 2.0 (see below). This 
results in the following warning messages as a result of NPE's in 
MirrorSourceTask.commitRecord for each filtered record:
{code:java}
[2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure 
committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
java.lang.NullPointerException
        at 
org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829) {code}
The reason seems to be that for filtered records metadata is null. Note that in 
the overridden SourceTask.commitRecord the javadoc clearly states that metadata 
will be null if the record was filtered.

In our case we use a custom predicate, but the issue can be reproduced with the 
following configuration:
{code:java}
clusters = source,target

tasks.max = 1

source.bootstrap.servers = <cluster1>
target.bootstrap.servers = <cluster2>

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

source->target.enabled = true
source->target.topics = topic1
source->target.transforms=Filter
source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
source->target.transforms.Filter.predicate=HeaderPredicate
source->target.predicates=HeaderPredicate
source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
source->target.predicates.HeaderPredicate.name=someheader
 {code}
Each record with the header key 'someheader' will result in the NPE and warning 
message.

On a side note, we couldn't find clear documentation on how to configure (SMT) 
filtering with MirrorMaker 2 or whether this is supported at all, but apart 
from the NPE's and warning messages this seems to functionally work for us with 
our custom filter.

 

  was:
We have a setup where we filter records with MirrorMaker 2.0 (see below). This 
results in the following warning messages as a result of NPE's in 
MirrorSourceTask.commitRecord for each filtered record:
{code:java}
[2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure 
committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
java.lang.NullPointerException
        at 
org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829) {code}
The reason seems to be that for filtered records metadata is null. Note that in 
the overridden SourceTask.commitRecord the javadoc clearly states that metadata 
will be null if the record was filtered.

In our case we use a custom predicate, but the issue can be reproduced with the 
following configuration:
{code:java}
clusters = source,target

tasks.max = 1

source.bootstrap.servers = <cluster1>
target.bootstrap.servers = <cluster2>

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

source->target.enabled = true
source->target.topics = topic1
source->target.transforms=Filter
source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
source->target.transforms.Filter.predicate=HeaderPredicate
source->target.predicates=HeaderPredicate
source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
source->target.predicates.HeaderPredicate.name=someheader
 {code}
Each record with the header key 'someheader' will result in the NPE and warning 
message.

On a side note, we couldn't find clear documentation on how to configure 
filtering with MirrorMaker 2 or whether this is supported at all, but apart 
from the NPE's and warning messages this seems to functionally work for us with 
our custom filter.

 


> MirrorMaker 2.0 NPE and Warning "Failure to commit records" for filtered 
> records
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-13632
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13632
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.1.0
>            Reporter: Bert Baron
>            Priority: Minor
>
> We have a setup where we filter records with MirrorMaker 2.0 (see below). 
> This results in the following warning messages as a result of NPE's in 
> MirrorSourceTask.commitRecord for each filtered record:
> {code:java}
> [2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure 
> committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
> java.lang.NullPointerException
>       at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.base/java.lang.Thread.run(Thread.java:829) {code}
> The reason seems to be that for filtered records metadata is null. Note that 
> in the overridden SourceTask.commitRecord the javadoc clearly states that 
> metadata will be null if the record was filtered.
> In our case we use a custom predicate, but the issue can be reproduced with 
> the following configuration:
> {code:java}
> clusters = source,target
> tasks.max = 1
> source.bootstrap.servers = <cluster1>
> target.bootstrap.servers = <cluster2>
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> source->target.enabled = true
> source->target.topics = topic1
> source->target.transforms=Filter
> source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
> source->target.transforms.Filter.predicate=HeaderPredicate
> source->target.predicates=HeaderPredicate
> source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
> source->target.predicates.HeaderPredicate.name=someheader
>  {code}
> Each record with the header key 'someheader' will result in the NPE and 
> warning message.
> On a side note, we couldn't find clear documentation on how to configure 
> (SMT) filtering with MirrorMaker 2 or whether this is supported at all, but 
> apart from the NPE's and warning messages this seems to functionally work for 
> us with our custom filter.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to