With PublishKafkaRecord you don’t need to do the split, you can pass in the whole array and it will send each record as a message.
Regards, Matt > On Sep 21, 2018, at 4:09 PM, Boris Tyukin <[email protected]> wrote: > > Hi Matt, > > it should work but how would I do the wrapping part? and what about > performance? single flowfile with demarcator worked really fast for me when I > did a quick test. Like less than a second versus 10 seconds when I used a > common split technique. And 10 seconds is a big deal in my case because that > flow needs to be run every minute. > > Thanks Charlie, was actually thinking to do the same thing with custom Groovy > processor but wanted to see if there was an easier way. > >> On Fri, Sep 21, 2018 at 3:38 PM Matt Burgess <[email protected]> wrote: >> You should be able to use PublishKafkaRecord with an AvroReader (using >> the embedded schema)/Writer to avoid any conversion and delimiter >> issues. If you need JSON on the Kafka topic you can use a >> JsonRecordSetWriter that inherits the schema from the AvroReader, and >> it will do the conversion for you, and output each record. >> >> Regards, >> Matt >> >> On Fri, Sep 21, 2018 at 3:25 PM Boris Tyukin <[email protected]> wrote: >> > >> > Hey guys, >> > >> > I have a flow returning thousands of records from RDBMS and I convert >> > returned AVRO to JSON and get something like below: >> > >> > [ >> > {"col1":"value11", "col2":"value21", "col3:"value31"}, >> > {"col1":"value12", "col2":"value22", "col3:"value32"}, >> > ... >> > ] >> > >> > So still a single flowFile. Now I need to wrap every record in array >> > around like that (an oversimplified example here): >> > >> > [ >> > {"payload": {"col1":"value11", "col2":"value21", "col3:"value31"}, >> > "meta": {"info": "system1", "timestamp":"2010-10-01 12:23:33"} >> > }| >> > {"payload": {"col1":"value12", "col2":"value22", "col3:"value32"} , >> > "meta": {"info": "system1", "timestamp":"2010-10-01 12:23:33"} >> > } >> > | >> > ] >> > >> > Basically, I want to >> > 1) remove root level [] and replace a comma with a pipe (See below why) >> > 2) keep a single flowFile without splitting but wrap source records under >> > payload dictionary and adding another dictionary meta with some attributes. >> > 3) do not want to define schema upfront because it might change in future >> > >> > I put pipe because I then want to publish these records to Kafka, using >> > demarcation option - it works much faster for me than splitting avro/json >> > into individual flowfiles. >> > >> > Thanks for any ideas, >> > Boris >> > >> > >> > >> > >> > >> > >> >
