Carlos,

Do you have the "duplicate" relationship being routed back to the
DetectDuplicate?  Also if you are trying to do different tables in
"parallel" (which is just "concurrent" on a single NiFi instance), I
assume you have set Max Concurrent Tasks in IncrementalLoadData to
something greater than 1?  How are you populating your
DistributedMapCache?  With a "run-once" separate flow?

An issue I see here is that once the table name is removed from the
cache, then unless something "resets the lock", all future flow files
with that table name will still be processed. If for some reason you
are guaranteed to only have 2 duplicate flow files per table name,
this might work (although you would have to manually refresh the cache
before running again).  An alternative might be to replace
DetectDuplicate with PutDistributedMapCache, setting the Cache Update
Strategy to "keep original". Then the flow files will have a "cached"
attribute, where the first flow file (i.e. when the cache value is not
present) will have "cached" set to true and the remainder (until the
cache entry is removed) will have "cached" false, so you can use
RouteOnAttribute to send the non-cached flow files back to
PutDistributedMapCache.

In this solution you'd still be relying on a
"RemoveDistributedMapCache" processor which does not exist, but you
could use the Groovy script as described in my last response.

This "lock" idea is interesting, I will give it some more thought,
perhaps there is something we could do in the framework and/or
extensions to enable this, if it is a common use case.  There have
been different Jira cases and discussions about barriers and general
aggregation patterns, though I'm not sure how/if they'd apply here.

Regards,
Matt

On Mon, Feb 13, 2017 at 2:08 PM, Carlos Manuel Fernandes (DSI)
<carlos.antonio.fernan...@cgd.pt> wrote:
> Thanks Matt for your quickly response.
>
> My problem isn’t to process the same table twice,  but to guarantee I don’t 
> process the same table at the same time,   what I wish achieve is a 
> synchronized process for  each table.
>
> Regards
> Carlos
>
> -----Original Message-----
> From: Matt Burgess [mailto:mattyb...@apache.org]
> Sent: segunda-feira, 13 de Fevereiro de 2017 18:25
> To: users@nifi.apache.org
> Subject: Re: RemoveDistributedMapCache
>
> Carlos,
>
> With a RemoveDistributedMapCache processor in your suggested flow, there 
> might be an issue depending on when the duplicates are routed off.  For 
> example, if the first time we see the table name, that flow file gets all the 
> way through to RemoveDistributedMapCache before a duplicate has been detected 
> by DetectDuplicate, then the cache entry would be removed and you could 
> process the same table twice. I guess the question here is: how do you know 
> when you're "done" with the cache value?
>
> Also FWIW, speaking of my Groovy DCache script, you can use it (or parts of 
> it) in an ExecuteScript processor to emulate the functionality of a 
> RemoveDistributedMapCache processor.
>
> Regards,
> Matt
>
>
> On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI) 
> <carlos.antonio.fernan...@cgd.pt> wrote:
>> Hello,
>>
>>
>>
>> I ‘m using NIFI to replicate tables from one relational
>> Database(Mainframe) to other Database,  with incremental updates,
>> based on a timestamp and primary key. The process is made with tree custom 
>> processors:
>> GenerateListOfTablesToSyncronize  -> CreteTableIfNotExists ->
>> IncrementalLoadData.  If by mistake, in
>> GenerateListOfTablesToSyncronize i generate the same table twice, I
>> must guarantee the two flows run sequential not in parallel. For that, I 
>> need some kind of lock, and the  MapCache
>> processors seems to be the solution.    The solution I see is:
>>
>>
>>
>> GenerateListOfTablesToSyncronize  -> DetectDuplicte (tableName, with
>> no age
>> Off) ->CreteTableIfNotExists -> IncrementalLoadData –>
>> RemoveDistributedMapCache (tableName)
>>
>>
>>
>> Unfortunately there isn’t  the processor RemoveDistributedMapCache, I
>> could handle this,  thanks to Matt Burgess
>> (https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html)
>> which make possible manipulate directly the Cache using groovy.   No one
>> have this kind of requirement to justify the creation of
>> RemoveDistributedMapCache ?
>>
>>
>>
>> Thanks
>>
>>
>>
>> Carlos

Reply via email to