Ok I just tried something like that and performance is ridiculous - less than a second!
SQL --> ConvertRecord (Avro to JSON) --> ReplaceText --> PutKafka The key was to set Output Grouping on JsonRecordWriter to One Line Per Object: [image: image.png] After that, I can use ReplaceText against entire flowFile to replace new line character with my custom delimiter and wrap entire record under my custom payload dict. I am curious to see if I can get away using Record processors all the way but performance is really good already. P.S. I know I said this many times here but NiFi and NiFi community are awesome! On Fri, Sep 21, 2018 at 4:29 PM Boris Tyukin <[email protected]> wrote: > I understand but I need to transform json first as I described in my > example (wrapping it under payload dict and adding meta dict with > additional elements). So it is not a simple passthrough transformation. > > If I use record processors, they deal with individual elements not the > entire json record. > > On Fri, Sep 21, 2018 at 4:21 PM Matt Burgess <[email protected]> wrote: > >> With PublishKafkaRecord you don’t need to do the split, you can pass in >> the whole array and it will send each record as a message. >> >> Regards, >> Matt >> >> On Sep 21, 2018, at 4:09 PM, Boris Tyukin <[email protected]> wrote: >> >> Hi Matt, >> >> it should work but how would I do the wrapping part? and what about >> performance? single flowfile with demarcator worked really fast for me when >> I did a quick test. Like less than a second versus 10 seconds when I used a >> common split technique. And 10 seconds is a big deal in my case because >> that flow needs to be run every minute. >> >> Thanks Charlie, was actually thinking to do the same thing with custom >> Groovy processor but wanted to see if there was an easier way. >> >> On Fri, Sep 21, 2018 at 3:38 PM Matt Burgess <[email protected]> >> wrote: >> >>> You should be able to use PublishKafkaRecord with an AvroReader (using >>> the embedded schema)/Writer to avoid any conversion and delimiter >>> issues. If you need JSON on the Kafka topic you can use a >>> JsonRecordSetWriter that inherits the schema from the AvroReader, and >>> it will do the conversion for you, and output each record. >>> >>> Regards, >>> Matt >>> >>> On Fri, Sep 21, 2018 at 3:25 PM Boris Tyukin <[email protected]> >>> wrote: >>> > >>> > Hey guys, >>> > >>> > I have a flow returning thousands of records from RDBMS and I convert >>> returned AVRO to JSON and get something like below: >>> > >>> > [ >>> > {"col1":"value11", "col2":"value21", "col3:"value31"}, >>> > {"col1":"value12", "col2":"value22", "col3:"value32"}, >>> > ... >>> > ] >>> > >>> > So still a single flowFile. Now I need to wrap every record in array >>> around like that (an oversimplified example here): >>> > >>> > [ >>> > {"payload": {"col1":"value11", "col2":"value21", "col3:"value31"}, >>> > "meta": {"info": "system1", "timestamp":"2010-10-01 12:23:33"} >>> > }| >>> > {"payload": {"col1":"value12", "col2":"value22", "col3:"value32"} , >>> > "meta": {"info": "system1", "timestamp":"2010-10-01 12:23:33"} >>> > } >>> > | >>> > ] >>> > >>> > Basically, I want to >>> > 1) remove root level [] and replace a comma with a pipe (See below why) >>> > 2) keep a single flowFile without splitting but wrap source records >>> under payload dictionary and adding another dictionary meta with some >>> attributes. >>> > 3) do not want to define schema upfront because it might change in >>> future >>> > >>> > I put pipe because I then want to publish these records to Kafka, >>> using demarcation option - it works much faster for me than splitting >>> avro/json into individual flowfiles. >>> > >>> > Thanks for any ideas, >>> > Boris >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> >>
