Hey Mark,

The Validate Record Processor was what I was looking for!

As we are using the _0_10 version of the produce record processor, we don’t 
seem to have any of the transactional properties listed? I’m guessing this is 
on the newer Kafka API processors?

Kind regards,

Nathan

From: Mark Payne [mailto:[email protected]]
Sent: 25 February 2021 17:08
To: [email protected]
Subject: Re: Converting Records

Nathan,

Have you taken a look at ValidateRecord yet? That should allow you to separate 
out the record that do not match the schema, before trying to publish to Kafka. 
Regarding the fact that some messages are already delivered to Kafka when you 
encounter a failure - this can be avoided if you configure the publisher to use 
transactions when sending to Kafka. Either all records in a FlowFile will go, 
or none of them will go. However, once you’ve separated out the invalid 
records, I suspect this will be less of a concern.

Thanks
-Mark



On Feb 25, 2021, at 7:44 AM, 
[email protected]<mailto:[email protected]> wrote:

Hi All,

I’ve had a look at various processors and the documentation and can’t seem to 
find any information. So I’m hoping someone may have an idea or point me in the 
right direction.

We consume from various data sources, normalise the data and then produce the 
records to Kafka in Avro format, using the PublishKafkaRecord processor 
(specifically 0_10).

However, one data source occasionally sends a message that cannot be converted 
from JSON to Avro as the field has no default value on the Avro schema.  We 
understand why this happens, and it wouldn’t be a massive issue if the flow 
file only contained on message. However, our flow files have up to 1000 
individual records. When one record in the array is invalid in the flowfile, 
the whole flowfile fails, even if it’s already produced some of the Kafka Topic 
messages in that flow file. This, to us, feels far from ideal as we didn’t 
realise this was the case till we tried to reprocess the file with the error 
still in place. We saw a massive increase of records published to Kafka because 
all records before the invalid record were republished multiple times (looped 
back queue), whilst we tried to understand the error.

Is there any processors or other solutions we can use that will validate the 
flowfile content is valid before we produce to Kafka and only reject the 
individual records that are invalid? For example, I have a flowfile with 100 
records, 2 of which and not valid against the schema. 98 of them are successful 
and continue to be published, and the 2 that are invalid get rejected and go 
down a failure queue.

We are on NiFi v1.11.4, and are looking to upgrade to V1.12.1 in the coming 
months

Kind Regards,

Nathan

Reply via email to