I understand but I need to transform json first as I described in my
example (wrapping it under payload dict and adding meta dict with
additional elements). So it is not a simple passthrough transformation.

If I use record processors, they deal with individual elements not the
entire json record.

On Fri, Sep 21, 2018 at 4:21 PM Matt Burgess <[email protected]> wrote:

> With PublishKafkaRecord you don’t need to do the split, you can pass in
> the whole array and it will send each record as a message.
>
> Regards,
> Matt
>
> On Sep 21, 2018, at 4:09 PM, Boris Tyukin <[email protected]> wrote:
>
> Hi Matt,
>
> it should work but how would I do the wrapping part? and what about
> performance? single flowfile with demarcator worked really fast for me when
> I did a quick test. Like less than a second versus 10 seconds when I used a
> common split technique. And 10 seconds is a big deal in my case because
> that flow needs to be run every minute.
>
> Thanks Charlie, was actually thinking to do the same thing with custom
> Groovy processor but wanted to see if there was an easier way.
>
> On Fri, Sep 21, 2018 at 3:38 PM Matt Burgess <[email protected]> wrote:
>
>> You should be able to use PublishKafkaRecord with an AvroReader (using
>> the embedded schema)/Writer to avoid any conversion and delimiter
>> issues. If you need JSON on the Kafka topic you can use a
>> JsonRecordSetWriter that inherits the schema from the AvroReader, and
>> it will do the conversion for you, and output each record.
>>
>> Regards,
>> Matt
>>
>> On Fri, Sep 21, 2018 at 3:25 PM Boris Tyukin <[email protected]>
>> wrote:
>> >
>> > Hey guys,
>> >
>> > I have a flow returning thousands of records from RDBMS and I convert
>> returned AVRO to JSON and get something like below:
>> >
>> > [
>> >   {"col1":"value11", "col2":"value21", "col3:"value31"},
>> >   {"col1":"value12", "col2":"value22", "col3:"value32"},
>> > ...
>> > ]
>> >
>> > So still a single flowFile. Now I need to wrap every record in array
>> around like that (an oversimplified example here):
>> >
>> > [
>> > {"payload":   {"col1":"value11", "col2":"value21", "col3:"value31"},
>> >   "meta": {"info": "system1", "timestamp":"2010-10-01 12:23:33"}
>> > }|
>> > {"payload":    {"col1":"value12", "col2":"value22", "col3:"value32"} ,
>> >   "meta": {"info": "system1", "timestamp":"2010-10-01 12:23:33"}
>> > }
>> > |
>> > ]
>> >
>> > Basically, I want to
>> > 1) remove root level [] and replace a comma with a pipe (See below why)
>> > 2) keep a single flowFile without splitting but wrap source records
>> under payload dictionary and adding another dictionary meta with some
>> attributes.
>> > 3) do not want to define schema upfront because it might change in
>> future
>> >
>> > I put pipe because I then want to publish these records to Kafka, using
>> demarcation option - it works much faster for me than splitting avro/json
>> into individual flowfiles.
>> >
>> > Thanks for any ideas,
>> > Boris
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>

Reply via email to