Ok I just tried something like that and performance is ridiculous - less
than a second!

SQL --> ConvertRecord (Avro to JSON) --> ReplaceText --> PutKafka

The key was to set Output Grouping on JsonRecordWriter to One Line Per
Object:

[image: image.png]
After that, I can use ReplaceText against entire flowFile to replace new
line character with my custom delimiter and wrap entire record under my
custom payload dict.

I am curious to see if I can get away using Record processors all the way
but performance is really good already.

P.S. I know I said this many times here but NiFi and NiFi community are
awesome!


On Fri, Sep 21, 2018 at 4:29 PM Boris Tyukin <[email protected]> wrote:

> I understand but I need to transform json first as I described in my
> example (wrapping it under payload dict and adding meta dict with
> additional elements). So it is not a simple passthrough transformation.
>
> If I use record processors, they deal with individual elements not the
> entire json record.
>
> On Fri, Sep 21, 2018 at 4:21 PM Matt Burgess <[email protected]> wrote:
>
>> With PublishKafkaRecord you don’t need to do the split, you can pass in
>> the whole array and it will send each record as a message.
>>
>> Regards,
>> Matt
>>
>> On Sep 21, 2018, at 4:09 PM, Boris Tyukin <[email protected]> wrote:
>>
>> Hi Matt,
>>
>> it should work but how would I do the wrapping part? and what about
>> performance? single flowfile with demarcator worked really fast for me when
>> I did a quick test. Like less than a second versus 10 seconds when I used a
>> common split technique. And 10 seconds is a big deal in my case because
>> that flow needs to be run every minute.
>>
>> Thanks Charlie, was actually thinking to do the same thing with custom
>> Groovy processor but wanted to see if there was an easier way.
>>
>> On Fri, Sep 21, 2018 at 3:38 PM Matt Burgess <[email protected]>
>> wrote:
>>
>>> You should be able to use PublishKafkaRecord with an AvroReader (using
>>> the embedded schema)/Writer to avoid any conversion and delimiter
>>> issues. If you need JSON on the Kafka topic you can use a
>>> JsonRecordSetWriter that inherits the schema from the AvroReader, and
>>> it will do the conversion for you, and output each record.
>>>
>>> Regards,
>>> Matt
>>>
>>> On Fri, Sep 21, 2018 at 3:25 PM Boris Tyukin <[email protected]>
>>> wrote:
>>> >
>>> > Hey guys,
>>> >
>>> > I have a flow returning thousands of records from RDBMS and I convert
>>> returned AVRO to JSON and get something like below:
>>> >
>>> > [
>>> >   {"col1":"value11", "col2":"value21", "col3:"value31"},
>>> >   {"col1":"value12", "col2":"value22", "col3:"value32"},
>>> > ...
>>> > ]
>>> >
>>> > So still a single flowFile. Now I need to wrap every record in array
>>> around like that (an oversimplified example here):
>>> >
>>> > [
>>> > {"payload":   {"col1":"value11", "col2":"value21", "col3:"value31"},
>>> >   "meta": {"info": "system1", "timestamp":"2010-10-01 12:23:33"}
>>> > }|
>>> > {"payload":    {"col1":"value12", "col2":"value22", "col3:"value32"} ,
>>> >   "meta": {"info": "system1", "timestamp":"2010-10-01 12:23:33"}
>>> > }
>>> > |
>>> > ]
>>> >
>>> > Basically, I want to
>>> > 1) remove root level [] and replace a comma with a pipe (See below why)
>>> > 2) keep a single flowFile without splitting but wrap source records
>>> under payload dictionary and adding another dictionary meta with some
>>> attributes.
>>> > 3) do not want to define schema upfront because it might change in
>>> future
>>> >
>>> > I put pipe because I then want to publish these records to Kafka,
>>> using demarcation option - it works much faster for me than splitting
>>> avro/json into individual flowfiles.
>>> >
>>> > Thanks for any ideas,
>>> > Boris
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>

Reply via email to