Hi Louis, Glad to hear that works.
There are two approaches I can think of, to accumulate and combine multiple service results. The goal is creating a single FlowFile containing all service call results, right? 1. Use DistributedCache(DC) as a shared heap space - Each part of flow calls external services and store its result into DC - Use cache keys in something like "OriginalRequestID_serviceType" format. For example, "OriginalIncomingFlowFileUUID_serviceA" - Use notify as well to let Wait know progress. Use serviceType as counter name - After Wait finishes, use LookupAttribute or LookupRecord to enrich the original FlowFile by fetching the accumulated results from DC using DistributedMapCacheLookupService 2. Use MergeContent processor - Route the original FlowFile into multiple sub-flows, meaning croning the same FlowFile - At each sub flow, call external service, update result FlowFile's attribute so that MergeContent can wait and merge them: -- fragment.identifier: some common id -- fragment.count: the total number of sub-flows -- fragment.index: to control merge order - Route each resulting FlowFile into the same MergeContent processor, use 'Defragment' Merge Strategy to merge FlowFiles based on 'fragment.*' attributes -- Use Header, Footer and Demarcator intelligently to combine contents, e.g. to form a valid JSON... etc I'm not sure how do you want to loop, but here is another example flow doing a traditional 'for' loop in NiFi. https://gist.github.com/ijokarumawak/01c4fd2d9291d3e74ec424a581659ca8 Hope these can be helpful. Thanks, Koji On Wed, Jan 9, 2019 at 1:04 AM Luis Carmona <[email protected]> wrote: > > > Hi Koji, > > tryed with some manual tests and seems to be working now, doesn't have the > problema I had before. Today I will try it with a massvice flow and that will > be the final check. > > Thanks for your help. Do you know where I can get a sample about processing > in a loop, I mean, send things to a server, wait the answer on that, send to > a second server accumulating the answers of both, and all this in a finite > loop determined by the answers, Gathering all the answers in one final Json. > > > Thank you very much. > > LC > > > > ----- Mensaje original ----- > De: "Koji Kawamura" <[email protected]> > Para: "users" <[email protected]> > Enviados: Lunes, 7 de Enero 2019 22:22:12 > Asunto: Re: Wait/Notify inconsistent behavior > > Hi Luis, > > Look forward to know how it goes with a consolidated single Notify instance. > Another benefit of doing so is by increasing 'Signal Buffer Count', > the number of updates against the Distributed Cache Service can be > decreased in your case, because multiple FlowFiles share the same > signal id. Notify processor can merge counter deltas locally then > update the cache entry only once. > > Thanks, > Koji > > On Tue, Jan 8, 2019 at 3:50 AM Luis Carmona <[email protected]> wrote: > > > > Hi Koji, > > > > thanks for taking the time to answer my question > > > > In the Wait processor: > > - Signal CounterName is empty (default). > > - Target Signal Count is set to 2 > > > > About the Notify processor, I used two of them thinking about that > > previously I set differently ${filename} in the precedings > > UpdateAttribute(s). > > > > I attached the image of both processors, and the template as well. > > > > > > The whole point of my layout is to send things to a server, wait the answer > > on that, send to a second server accumulating the answers of both, and all > > this in a finite loop determined by the answers, Gathering all the answers > > in one final Json. > > > > By now Im stuck in the wait/notify issue, thanks for the sample I'll look > > into it. Then I will see how to get the loop. > > > > Thanks a lot, > > > > Regards, > > > > LC > > > > > > ----- Mensaje original ----- > > De: "Koji Kawamura" <[email protected]> > > Para: "users" <[email protected]> > > Enviados: Domingo, 6 de Enero 2019 23:42:56 > > Asunto: Re: Wait/Notify inconsistent behavior > > > > The reason tu put two Notify processors is that I'm using different > > > > Hi Luis, > > > > Just a quick question, how are the "Signal Counter Name" and "Target > > Signal Count" properties for the Wait processor configured? > > If you'd like to wait the two sub-flow branches to complete, then: > > "Signal Counter Name" should be blank, meaning check total count for > > all counter names > > "Target Signal Count" should be 2. > > > > If those are configured like that, then would you be able to share > > your flow as a template for further investigation? > > > > One more thing, although Notify processor cares about atomicity, due > > to the underlying distributed cache mechanism, concurrent writes to > > the same cache identifier can overwrite existing signal, meaning one > > of the two notifications can be lost. > > To avoid this, using the same Notify instance at 3a and 3b in your > > flow is highly recommended. > > Here is an example flow to do that: > > https://gist.github.com/ijokarumawak/6da4bd914236e941071cad103e1186dd > > > > Thanks, > > Koji > > > > On Sat, Jan 5, 2019 at 11:28 AM Joe Witt <[email protected]> wrote: > > > > > > thanks for letting us know. the lists can be a bit awkward from a user > > > experience pov. no worries > > > > > > On Fri, Jan 4, 2019, 9:26 PM Luis Carmona <[email protected] wrote: > > >> > > >> I'm sorry, > > >> > > >> got messages from nifi-users saying "UNCHECKED", and reading about > > >> understood the message did not get trough. > > >> > > >> Thanks for letting me know. > > >> > > >> LC > > >> > > >> ________________________________ > > >> De: "Joe Witt" <[email protected]> > > >> Para: "users" <[email protected]> > > >> Enviados: Viernes, 4 de Enero 2019 23:23:02 > > >> Asunto: Re: Wait/Notify inconsistent behavior > > >> > > >> Please avoid sending more copies of the question. Hopefully someone > > >> familiar with the processors in question will be available in time. > > >> > > >> > > >> Thanks > > >> > > >> > > >> On Fri, Jan 4, 2019 at 9:14 PM Luis Carmona <[email protected]> > > >> wrote: > > >>> > > >>> Hi everyone, > > >>> > > >>> Im having a strange behavior with Wait / Notify mechanism. Attached is > > >>> the image of the flow. > > >>> Basically I'm trying to insert in Elastic search two record, > > >>> simultaneously, and if both went ok, then insert a record on a bpm > > >>> service. > > >>> > > >>> For that (in the image): > > >>> > > >>> - Step 1: Set the attribute fragment.identifier to 1 > > >>> - Step 2: Send the flow to Wait state, and, > > >>> for 2a I set the attribute filename to 'caso' (without the > > >>> quotes) just before the POST to ElasticSearch > > >>> for 2b I set the attribute filename to 'doc' (without the > > >>> quotes) just before the other POST to ElasticSearch > > >>> - Step 3: On 3a, once the insert is finished, I'm expecting the notify > > >>> sends the signal to the wait state, and same thing for 3b. > > >>> - Step 4: HERE is my problem: > > >>> Sometimes the WAIT has count.caso = 1, and count.doc=1 so > > >>> everything goes well on the RouteAttribute after step 5. > > >>> Some other, it just receives one of the Nitifications, either > > >>> the 'doc' one, or the 'caso' one, so as the other doesn't come, > > >>> the firts to arrive gets Queued. I 've checked and the two > > >>> Elastic insertions work almost inmediatly, so thatÅ› not the problem. > > >>> - Setp 5: Is the expected path unless there was a real fail, but it is > > >>> not what is happening. > > >>> > > >>> Please any help, or tip would be very preciated. > > >>> > > >>> Regards, > > >>> > > >>> LC > > >>> > > >>
