Re: PutTCP connector not cleaning up dangling connections

2017-09-19 Thread ddewaele
I've let it run overnight on 1.4.0-SNAPSHOT. Didn't see any hanging
connections and after timeouts they were cleaned up.

However, I noticed something else (perhaps unrelated). About 40% of the
messages that we "get" from the tcp connection contained "noise / garbage"
and didn't pass their checksum. On 1.1.0 we never had that.

If I manually "put" data on the tcp connection (via a telnet session) to
trigger a response I don't see this "noise / garbage". So it seems to
originate  from PutTCP.

Any pointers ?  Going to investigate further today and check the detailed
release notes. (as I am coming from 1.1.0).





--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/


Re: New ListFTP (and possibly ListXXX) processors fail to pickup new files

2017-09-19 Thread Joe Witt
Gino

When the primary node shifts the new primary node should take over the
function of doing the listings for you.  You'll want to be using the
distributed cache property of the ListFTP/etc.. processors to help
keep state in such cases.

Thanks

On Tue, Sep 19, 2017 at 4:50 PM, Gino Lisignoli  wrote:
> Hi
>
> I can confirm that the cause of my problem is the primary node shifting to a
> new node in my cluster. This causes all of the ListXXX processors to stop
> working (as they are set to only run on my primary node). I'm not sure what
> is causing my primary node to shift, or why the processors are unable to
> recover when the node shifts. If I find any more info I'll raise a ticket.
>
> On Thu, Sep 14, 2017 at 8:08 AM, Joe Witt  wrote:
>>
>> Thanks Gino.  If you can confirm/replicate the behavior please do file a
>> jira.
>>
>> Thanks
>>
>> On Wed, Sep 13, 2017 at 4:06 PM, Gino Lisignoli 
>> wrote:
>> > In 1.4.0-SNAPSHOT, the ListFTP processor fails to add new files after
>> > several hours.
>> >
>> > At the moment I'm testing this on a cluster of 2. But I'm looking to
>> > replicate my issue on a single instance as well.
>> >
>> > I'm not sure if this is a cluster state problem, a processor state
>> > problem
>> > or the new ListXXX commits
>> >
>> > (https://github.com/apache/nifi/commit/e68ff153e81ddb82d1136d44a96bdb7a70da86d1
>> > and
>> >
>> > https://github.com/apache/nifi/commit/28ee70222b892fb799f5f74a31a9de678d9fb629).
>> >
>> > I'll be looking at those commits first around the ListFTP process.
>> > Unless
>> > theres any advice can be offered.
>> >
>> > Should I submit this to Jira issues, or wait until I can
>> > confirm/replicate
>> > the behaviour?
>
>


Re: New ListFTP (and possibly ListXXX) processors fail to pickup new files

2017-09-19 Thread Gino Lisignoli
Hi

I can confirm that the cause of my problem is the primary node shifting to
a new node in my cluster. This causes all of the ListXXX processors to stop
working (as they are set to only run on my primary node). I'm not sure what
is causing my primary node to shift, or why the processors are unable to
recover when the node shifts. If I find any more info I'll raise a ticket.

On Thu, Sep 14, 2017 at 8:08 AM, Joe Witt  wrote:

> Thanks Gino.  If you can confirm/replicate the behavior please do file a
> jira.
>
> Thanks
>
> On Wed, Sep 13, 2017 at 4:06 PM, Gino Lisignoli 
> wrote:
> > In 1.4.0-SNAPSHOT, the ListFTP processor fails to add new files after
> > several hours.
> >
> > At the moment I'm testing this on a cluster of 2. But I'm looking to
> > replicate my issue on a single instance as well.
> >
> > I'm not sure if this is a cluster state problem, a processor state
> problem
> > or the new ListXXX commits
> > (https://github.com/apache/nifi/commit/e68ff153e81ddb82d1136d44a96bdb
> 7a70da86d1
> > and
> > https://github.com/apache/nifi/commit/28ee70222b892fb799f5f74a31a9de
> 678d9fb629).
> >
> > I'll be looking at those commits first around the ListFTP process. Unless
> > theres any advice can be offered.
> >
> > Should I submit this to Jira issues, or wait until I can
> confirm/replicate
> > the behaviour?
>


Re: AttributesToJSON

2017-09-19 Thread Joe Witt
Ha!  They are nearly as cool as nifi reading bedtime stories.  You
have a good point.

I was all happy we were about to make your flow far
better/faster/stronger.  Then you threw down with HL7.

We really need to make an HL7RecordReader then the rest of this would
be fast/fun.  Any volunteers?

Thanks

On Tue, Sep 19, 2017 at 2:09 PM, Charlie Frasure
 wrote:
> Thanks Joe,
>
> I'm using the HL7 processor to extract HL7v2 data to attributes, then
> mapping the attributes to expected JSON entries.  I am using the Record
> reader/writers elsewhere, definitely the best thing that has happened to
> NiFi since bedtime stories [1].
> So my current flow is:
>
> GetFile (leave original file) ->
> ExtractHL7Attributes ->
> UpdateAttribute (for light conversions) ->
> AttributesToJSON (as flowfile-content) ->
> JoltTransformJSON (This could probably be replaced by record readers /
> writers) ->
> InvokeHTTP (call webservice) ->
> FetchFile (using filename attribute)
>
> There are some additional exception paths, but this flow works as intended
> except when the web service can't keep up with new files.  I have a delay
> built in to GetFile to account for this, which mostly works, but sometimes
> we pull the same file more than once.  I suppose I could also move the file
> to an interim folder to prevent multiple reads.
>
> Thanks,
> Charlie
>
>
> [1]
> https://community.hortonworks.com/articles/28380/nifi-ocr-using-apache-nifi-to-read-childrens-books.html
>
>
> On Tue, Sep 19, 2017 at 11:35 AM, Joe Witt  wrote:
>>
>> Charlie
>>
>> You'll absolutely want to look at the Record reader/writer
>> capabilities.  It will help you convert from the CSV (or similar) to
>> JSON without having to go through attributes at all.
>>
>> Take a look here
>>
>> https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates
>> and you could see the provenance example for configuration.  If you
>> want to share a sample line of the delimited data and a sample of the
>> output JSON I can share you back a template that would help you get
>> started.
>>
>> Thanks
>> Joe
>>
>> On Tue, Sep 19, 2017 at 11:29 AM, Charlie Frasure
>>  wrote:
>> > I have a data flow that takes delimited input using GetFile, extracts
>> > some
>> > of that into attributes, converts the attributes to a JSON object,
>> > reformats
>> > the JSON using the Jolt transformer, and then does additional processing
>> > before using PutFile to move the original file based on the dataflow
>> > result.
>> > I have to work around NiFi to make the last step happen.
>> >
>> > I am setting the AttributesToJSON to replace the flowfile content
>> > because
>> > the Jolt transformer requires the JSON object to be in the flowfile
>> > content.
>> > There is no "original" relationship out of AttributesToJSON, so this
>> > data
>> > would be lost.  I have the "Keep Source File" set to true on the
>> > GetFile,
>> > and then use PutFile with the filename to grab it later.
>> >
>> > This works for the most part, but under heavy data loads we have some
>> > errors
>> > trying to process a file more than once.
>> >
>> > I think we could resolve this by not keeping the source file, sending a
>> > duplicate of the content down another path and merging later.  I want to
>> > explore the possibility of either 1) having an "original" relationship
>> > whenever the previous flowfile content is being modified or replaced, or
>> > 2)
>> > maintaining an "original" flowfile content alongside the working content
>> > so
>> > that it is easily available once the processing is complete.
>> >
>> > Am I missing a more direct way to process this data?  Other thoughts?
>> >
>> > Thanks,
>> > Charlie
>> >
>> >
>> >
>> >
>
>


Re: PutTCP connector not cleaning up dangling connections

2017-09-19 Thread Joe Witt
thanks davy - i'll get it reviewed and merged if you can give it a go
and verify it makes your case better.

Thanks

On Tue, Sep 19, 2017 at 1:57 PM, ddewaele  wrote:
> Hi,
>
> Trying it out now. forgot how long it takes to build :)
>
> Will give feedback here.
>
> Thx for the client port logging also ...  that is always useful for
> debugging perhaps we can check later in what way we can retrieve it in
> the timeout scenarios / standard close scenario
>
> Really hope this makes it into the 1.4.0 release.
>
>
>
> --
> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/


Re: AttributesToJSON

2017-09-19 Thread Charlie Frasure
Thanks Joe,

I'm using the HL7 processor to extract HL7v2 data to attributes, then
mapping the attributes to expected JSON entries.  I am using the Record
reader/writers elsewhere, definitely the best thing that has happened to
NiFi since bedtime stories [1].
So my current flow is:

GetFile (leave original file) ->
ExtractHL7Attributes ->
UpdateAttribute (for light conversions) ->
AttributesToJSON (as flowfile-content) ->
JoltTransformJSON (This could probably be replaced by record readers /
writers) ->
InvokeHTTP (call webservice) ->
FetchFile (using filename attribute)

There are some additional exception paths, but this flow works as intended
except when the web service can't keep up with new files.  I have a delay
built in to GetFile to account for this, which mostly works, but sometimes
we pull the same file more than once.  I suppose I could also move the file
to an interim folder to prevent multiple reads.

Thanks,
Charlie


[1]
https://community.hortonworks.com/articles/28380/nifi-ocr-using-apache-nifi-to-read-childrens-books.html


On Tue, Sep 19, 2017 at 11:35 AM, Joe Witt  wrote:

> Charlie
>
> You'll absolutely want to look at the Record reader/writer
> capabilities.  It will help you convert from the CSV (or similar) to
> JSON without having to go through attributes at all.
>
> Take a look here
> https://cwiki.apache.org/confluence/display/NIFI/
> Example+Dataflow+Templates
> and you could see the provenance example for configuration.  If you
> want to share a sample line of the delimited data and a sample of the
> output JSON I can share you back a template that would help you get
> started.
>
> Thanks
> Joe
>
> On Tue, Sep 19, 2017 at 11:29 AM, Charlie Frasure
>  wrote:
> > I have a data flow that takes delimited input using GetFile, extracts
> some
> > of that into attributes, converts the attributes to a JSON object,
> reformats
> > the JSON using the Jolt transformer, and then does additional processing
> > before using PutFile to move the original file based on the dataflow
> result.
> > I have to work around NiFi to make the last step happen.
> >
> > I am setting the AttributesToJSON to replace the flowfile content because
> > the Jolt transformer requires the JSON object to be in the flowfile
> content.
> > There is no "original" relationship out of AttributesToJSON, so this data
> > would be lost.  I have the "Keep Source File" set to true on the GetFile,
> > and then use PutFile with the filename to grab it later.
> >
> > This works for the most part, but under heavy data loads we have some
> errors
> > trying to process a file more than once.
> >
> > I think we could resolve this by not keeping the source file, sending a
> > duplicate of the content down another path and merging later.  I want to
> > explore the possibility of either 1) having an "original" relationship
> > whenever the previous flowfile content is being modified or replaced, or
> 2)
> > maintaining an "original" flowfile content alongside the working content
> so
> > that it is easily available once the processing is complete.
> >
> > Am I missing a more direct way to process this data?  Other thoughts?
> >
> > Thanks,
> > Charlie
> >
> >
> >
> >
>


Re: PutTCP connector not cleaning up dangling connections

2017-09-19 Thread ddewaele
Hi,

Trying it out now. forgot how long it takes to build :)

Will give feedback here.

Thx for the client port logging also ...  that is always useful for
debugging perhaps we can check later in what way we can retrieve it in
the timeout scenarios / standard close scenario

Really hope this makes it into the 1.4.0 release.



--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/


Re: InferAvroSchema OoM w/ large json

2017-09-19 Thread Neil Derraugh
I was imagining that inference using only the "first ten records" would not
require inferring the entire schema, I get it now.

I was hoping to use the InferAvroSchema to split the records into more
manageable chunks.  So for a generalized solution it's a chicken and egg
thing.  However for a less generalized solution I can certainly skip the
inference step altogether for now.

Thanks Jeremy !

On Thu, Sep 14, 2017 at 10:00 PM, Jeremy Dyer  wrote:

> Neil - The number of records analyzed property in conjunction with JSON
> content is a little hairy. It's pretty clear with something like CSV where
> each line is considered a record. With JSON that distinction becomes less
> clear on what is actually a "record". AKA is the record the object or each
> element in the array. Ultimately NiFi invokes the Kite SDK to handle this
> task. Kite attempts to parse the JSON and iterate through X number of
> assumed Avro schemas/records. I would assume that kite is probably
> attempting to read in this entire array of objects and causing your OOM
> error.
>
> I would feed in a smaller set of data to the InferAvroScema processor and
> then use the resulting output schema from that run on subsequent processors
> removing the InferAvroScema from the flow all together. I am making the
> assumption that this data is always the same. If that is not the case this
> will not work and we can look at other options.
>
> Don't worry we will get it going at some point 
>
> - Jeremy Dyer
>
>
>
> Sent from my iPhone
> > On Sep 14, 2017, at 5:13 PM, Neil Derraugh  intellifylearning.com> wrote:
> >
> > I have a 328MB json file sitting in a queue that feeds into an
> InferAvroSchema (1.3.0) processor that is currently stopped.  The json file
> consists of a single array containing just over 1.5M small objects.  The
> heap is set to 4GB.  Before starting the processor the heap usage on the
> node that has the file is about 20%. If I start the InferAvroSchema
> eventually I run out of heap and start seeing the following in the logs.
> >
> > 2017-09-14 13:01:25,597 WARN [Clustering Tasks Thread-3]
> o.apache.nifi.controller.FlowController Failed to send heartbeat due to:
> org.apache.nifi.cluster.protocol.ProtocolException: Failed marshalling
> 'HEARTBEAT' protocol message due to: javax.net.ssl.SSLException: Received
> fatal alert: unexpected_message
> > 2017-09-14 13:01:25,655 INFO [Curator-ConnectionStateManager-0]
> o.a.n.c.l.e.CuratorLeaderElectionManager org.apache.nifi.controller.
> leader.election.CuratorLeaderElectionManager$ElectionListener@27ae11db
> Connection State changed to SUSPENDED
> > 2017-09-14 13:01:25,690 ERROR [Curator-Framework-0] 
> > o.a.c.f.imps.CuratorFrameworkImpl
> Background operation retry gave up
> > org.apache.zookeeper.KeeperException$ConnectionLossException:
> KeeperErrorCode = ConnectionLoss
> >   at org.apache.zookeeper.KeeperException.create(
> KeeperException.java:99)
> >   at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> checkBackgroundRetry(CuratorFrameworkImpl.java:728)
> >   at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> performBackgroundOperation(CuratorFrameworkImpl.java:857)
> >   at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> >   at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> access$300(CuratorFrameworkImpl.java:64)
> >   at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.
> call(CuratorFrameworkImpl.java:267)
> >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >   at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >   at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >   at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >   at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >   at java.lang.Thread.run(Thread.java:748)
> > 2017-09-14 13:01:25,691 ERROR [Curator-Framework-0] 
> > o.a.c.f.imps.CuratorFrameworkImpl
> Background retry gave up
> > org.apache.curator.CuratorConnectionLossException: KeeperErrorCode =
> ConnectionLoss
> >   at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> performBackgroundOperation(CuratorFrameworkImpl.java:838)
> >   at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
> >   at org.apache.curator.framework.imps.CuratorFrameworkImpl.
> access$300(CuratorFrameworkImpl.java:64)
> >   at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.
> call(CuratorFrameworkImpl.java:267)
> >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >   at java.util.concurrent.ScheduledThreadPoolExecutor$
> 

Re: AttributesToJSON

2017-09-19 Thread Joe Witt
Charlie

You'll absolutely want to look at the Record reader/writer
capabilities.  It will help you convert from the CSV (or similar) to
JSON without having to go through attributes at all.

Take a look here
https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates
and you could see the provenance example for configuration.  If you
want to share a sample line of the delimited data and a sample of the
output JSON I can share you back a template that would help you get
started.

Thanks
Joe

On Tue, Sep 19, 2017 at 11:29 AM, Charlie Frasure
 wrote:
> I have a data flow that takes delimited input using GetFile, extracts some
> of that into attributes, converts the attributes to a JSON object, reformats
> the JSON using the Jolt transformer, and then does additional processing
> before using PutFile to move the original file based on the dataflow result.
> I have to work around NiFi to make the last step happen.
>
> I am setting the AttributesToJSON to replace the flowfile content because
> the Jolt transformer requires the JSON object to be in the flowfile content.
> There is no "original" relationship out of AttributesToJSON, so this data
> would be lost.  I have the "Keep Source File" set to true on the GetFile,
> and then use PutFile with the filename to grab it later.
>
> This works for the most part, but under heavy data loads we have some errors
> trying to process a file more than once.
>
> I think we could resolve this by not keeping the source file, sending a
> duplicate of the content down another path and merging later.  I want to
> explore the possibility of either 1) having an "original" relationship
> whenever the previous flowfile content is being modified or replaced, or 2)
> maintaining an "original" flowfile content alongside the working content so
> that it is easily available once the processing is complete.
>
> Am I missing a more direct way to process this data?  Other thoughts?
>
> Thanks,
> Charlie
>
>
>
>


AttributesToJSON

2017-09-19 Thread Charlie Frasure
I have a data flow that takes delimited input using GetFile, extracts some
of that into attributes, converts the attributes to a JSON object,
reformats the JSON using the Jolt transformer, and then does additional
processing before using PutFile to move the original file based on the
dataflow result.  I have to work around NiFi to make the last step happen.

I am setting the AttributesToJSON to replace the flowfile content because
the Jolt transformer requires the JSON object to be in the flowfile
content.  There is no "original" relationship out of AttributesToJSON, so
this data would be lost.  I have the "Keep Source File" set to true on the
GetFile, and then use PutFile with the filename to grab it later.

This works for the most part, but under heavy data loads we have some
errors trying to process a file more than once.

I think we could resolve this by not keeping the source file, sending a
duplicate of the content down another path and merging later.  I want to
explore the possibility of either 1) having an "original" relationship
whenever the previous flowfile content is being modified or replaced, or 2)
maintaining an "original" flowfile content alongside the working content so
that it is easily available once the processing is complete.

Am I missing a more direct way to process this data?  Other thoughts?

Thanks,
Charlie


Re: Removing duplicates from data

2017-09-19 Thread Koji Kawamura
Hi Vikram,

PutDatabaseRecord has been available since NiFi 1.2.0. So you need to
upgrade your NiFi installation to use that.
Other than PutDatabaseRecord, there are three possible ways to remove
duplicates that I can think of:

1. Disable PutSQL, 'Support Fragmented Transaction'
2. Use DetectDuplicate, need to use ExtractJSONPath in advance to create
"Cache Entry Identifier" value from Col A and B
3. If your PostgreSQL is 9.5 or higher, use INSERT ... ON CONFLICT UPDATE
statement to do 'upsert' operation

#1 Since you split an Avro dataset via SplitAvro, those fragmented
FlowFiles share the same 'fragment.identifier' attribute, then PutSQL
executes insert statements for those by single batch operation.
If there's a primary key constraint violation, whole insert statements will
be rolled back. But if you disable 'Support Fragmented Transaction', each
insert will be committed individually, and if you setup the right primary
key constraint on PostgreSQL, you will get the expected behavior,
duplicated record will fail.
However, this approach will provide less database update throughput by not
using batch insert.

#2 NiFi also has DetectDuplicate processor. Please see processor usage doc
for detail.
In short, you would extract Col A and B value using ExtractJSONPath into
FlowFile attribute, e.g. col.a and col.b, then use those as "Cache Entry
Identifier" at DetectDuplicate with NiFI Expression Language, for example,
${col.a}::${col.b}.
Then DetectDuplicate processor can route FlowFiles to 'duplicate' if the
same col.a and col.b value pair is already seen, the 2nd "C2::item3" in
your example data set.
This approach has limitation on number of entries can be cached, meaning if
the same key arrives after the previous one is already invalidated from the
cache, it can't be detected.

#3 I personally haven't tested this, but you may be able to construct a
flow to execute 'upsert' operation so that you don't have to worry about
duplicates.
https://stackoverflow.com/questions/17267417/how-to-upsert-merge-insert-on-duplicate-update-in-postgresql

Thanks,
Koji

On Tue, Sep 19, 2017 at 1:18 PM, Vikram More 
wrote:

> Could not find 'PutDatabaseRecord' in the NiFi version : 1.1.0.2.1.2.0-10
> I am using . Please suggest ?
>
> On Tue, Sep 19, 2017 at 12:10 AM, Vikram More 
> wrote:
>
>> Hi Koji,
>> Thanks for response and helpful links !
>>
>> NiFi version : 1.1.0.2.1.2.0-10
>>
>> I am trying to move data from operational system (oracle db) to
>> analytical system (postgres db). Postgres table has been model/designed by
>> us (and can add primary key). Data from oracle looks like below  (i need to
>> remove duplicate record for combination on ColA , ColB)
>>
>> Col A Col B
>> C1 item 1
>> C1 item 2
>> *C2* *item 3*
>> *C2* *item 4*
>> *C2* *item 3*
>> C3 item 1
>> C4 null
>> C5 item 5
>> C6 item 7
>> I will try to explore PutDatabaseRecord processor and see i can achieve
>> desired purpose.
>>
>> Thanks,
>> Vikram
>>
>> On Mon, Sep 18, 2017 at 9:59 PM, Koji Kawamura 
>> wrote:
>>
>>> Hello Vikram,
>>>
>>> Welcome to NiFi and the community :)
>>>
>>> Would you elaborate your data flow? And which version you are using?
>>> For example, can you share some input data extracted from Oracle? I
>>> wonder why you need to remove duplicate records while PostgreSQL
>>> doesn't have primary key constraint, or why you have such records in
>>> the beginning.
>>>
>>> Current PutSQL does not report the cause of batch update failure well.
>>> But that behavior has been improved and you can see what is the cause
>>> if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source
>>> code to try it).
>>> https://issues.apache.org/jira/browse/NIFI-4162
>>>
>>> Please refer NiFi README.md for how to build and run NiFi from source
>>> code.
>>> https://github.com/apache/nifi
>>>
>>> Also, in order to put Avro data to an RDBMS, NiFi also has
>>> PutDatabaseRecord processor today. Which can work more efficiently
>>> because you don't have to use 'split avro -> avrotojson -> jsontosql'
>>> part, PutDatabaseRecord can directly execute DML statement from Avro
>>> dataset.
>>> https://nifi.apache.org/docs/nifi-docs/components/org.apache
>>> .nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.sta
>>> ndard.PutDatabaseRecord/index.html
>>>
>>> Thanks,
>>> Koji
>>>
>>> On Tue, Sep 19, 2017 at 9:21 AM, Vikram More 
>>> wrote:
>>> > Hi Everyone,
>>> >
>>> > I am new to NiFi and community :)
>>> >
>>> > I am trying to build a Nifi flow which will pull from Oracle table and
>>> load
>>> > into Postgres table. My select query has two columns and I need to
>>> remove
>>> > duplicates based on these two columns. Can I remove duplicates in Nifi
>>> based
>>> > on two column data values. My flow is like below -
>>> > ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL
>>> >
>>> >
>>> > PutSQL question : Oracle table has ~ 4 million