Hi All,

I've had a look at various processors and the documentation and can't seem to 
find any information. So I'm hoping someone may have an idea or point me in the 
right direction.

We consume from various data sources, normalise the data and then produce the 
records to Kafka in Avro format, using the PublishKafkaRecord processor 
(specifically 0_10).

However, one data source occasionally sends a message that cannot be converted 
from JSON to Avro as the field has no default value on the Avro schema.  We 
understand why this happens, and it wouldn't be a massive issue if the flow 
file only contained on message. However, our flow files have up to 1000 
individual records. When one record in the array is invalid in the flowfile, 
the whole flowfile fails, even if it's already produced some of the Kafka Topic 
messages in that flow file. This, to us, feels far from ideal as we didn't 
realise this was the case till we tried to reprocess the file with the error 
still in place. We saw a massive increase of records published to Kafka because 
all records before the invalid record were republished multiple times (looped 
back queue), whilst we tried to understand the error.

Is there any processors or other solutions we can use that will validate the 
flowfile content is valid before we produce to Kafka and only reject the 
individual records that are invalid? For example, I have a flowfile with 100 
records, 2 of which and not valid against the schema. 98 of them are successful 
and continue to be published, and the 2 that are invalid get rejected and go 
down a failure queue.

We are on NiFi v1.11.4, and are looking to upgrade to V1.12.1 in the coming 
months

Kind Regards,

Nathan

Reply via email to