Kumiko,

In general you shouldn't have to create threads in your processors, with
the exception of some special cases.
The framework has a thread pool and it takes one of those threads and calls
the onTrigger method of your processor.

If you want multiple threads to call onTrigger, then each processor has a
Concurrent Tasks property in the UI on the scheduling tab,
which equates to the number of threads that will concurrently call
onTrigger.

A processor developer needs to only worry about the business logic in the
onTrigger method, and needs to ensure
thread-safe access to any member variables or state stored in the processor.

Hope that helps.

-Bryan


On Thu, Jun 9, 2016 at 2:11 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote:

> Microsoft found this is an issue with the SDK, they are working on a fix,
> they do not have the ETA for the fix.  To workaround this issue, I’m trying
> to create the multiple threads in using AbstractSessionFactoryProcessor and
> handle the Create a file in a single thread.   I’m having a problem that
> the single thread is not working correctly.  The processor is still acting
> like a single thread.
>
>
>
> When I create a thread to handle the create a file, do I have to call this
> method using java.util.concurrent.ExecutorService?
>
>
>
> Are there any sample processors that I can take a look?
>
>
>
> Thanks
>
> Kumiko
>
>
>
> *From:* Kumiko Yada [mailto:kumiko.y...@ds-iq.com]
> *Sent:* Sunday, June 5, 2016 6:28 PM
> *To:* users@nifi.apache.org
> *Cc:* Ki Kang <ki.k...@ds-iq.com>; Kevin Verhoeven <
> kevin.verhoe...@ds-iq.com>
> *Subject:* RE: Custom processor is failing for concurrency
>
>
>
> Thank you, Bryan.  I’m working with Microsoft on this issue.  Will keep
> you guys updated.
>
>
>
> Thanks
>
> Kumiko
>
>
>
> *From:* Bryan Bende [mailto:bbe...@gmail.com <bbe...@gmail.com>]
> *Sent:* Friday, June 3, 2016 2:32 PM
> *To:* users@nifi.apache.org
> *Subject:* Re: Custom processor is failing for concurrency
>
>
>
> It is hard to say for sure, but I think your NiFi processor is generally
> ok regarding thread safety, but I think there could be a problem in the
> Azure SDK code...
>
>
>
> RequestFactory has an instance of BaseUrl and every time
> RequestFactory.create() is called, it calls BaseUrl.url().
>
>
>
> The implementation of BaseUrl is the following (according to my IntelliJ
> attaching the sources...):
>
>
>
> public class AutoRestBaseUrl implements BaseUrl {
> /** A template based URL with variables wrapped in {}s. */
> private String template;
> /** a mapping from {} wrapped variables in the template and their actual
> values. */
> private Map<CharSequence, String> mappings;
>
> @Override
> public HttpUrl url() {
> String url = template;
> for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {
> url = url.replace(entry.getKey(), entry.getValue());
> }
> mappings.clear();
> return HttpUrl.parse(url);
> }
>
> /**
> * Creates an instance of a template based URL.
> *
> * @param url the template based URL to use.
> */
> public AutoRestBaseUrl(String url) {
> this.template = url;
> this.mappings = new HashMap<>();
> }
>
> /**
> * Sets the value for the {} wrapped variables in the template URL.
> * @param matcher the {} wrapped variable to replace.
> * @param value the value to set for the variable.
> */
> public void set(CharSequence matcher, String value) {
> this.mappings.put(matcher, value);
> }
> }
>
>
>
> The exception is coming from the line where it is looping over the
> entryset:
>
>
>
> for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {
>
>
>
> Right after that loop it calls mappings.clear() so if the RequestFactory
> is shared by multiple threads (which I think it is), then one thread could
> be iterating over the set, which another calls mappings.clear().
>
>
>
>
>
> On Fri, Jun 3, 2016 at 5:09 PM, Oleg Zhurakousky <
> ozhurakou...@hortonworks.com> wrote:
>
> Kumiko
>
>
>
> It appears that the current state of the source you linked in is not in
> sync with what is in the stack trace. Perhaps you have made some code
> modifications (e.g., line 218 is an empty line in code while it has a
> pointer in the star trace).
>
> In any event, from what I can see the error is coming from Azure libraries
> (not NiFi). Specifically ‘com.microsoft.rest.AutoRestBaseUrl.url(..)’ seems
> to be doing some iteration where I presume the remove is called. Perhaps it
> is not a thread safe class after all. What does Microsoft documentation
> says? Have you looked at the source to see what’s going on there? If its
> open please link and we can tale a look.
>
>
>
> Cheers
>
> Oleg
>
>
>
> On Jun 3, 2016, at 4:58 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote:
>
>
>
> Here is the code, https://github.com/kyada1/PutFileAzureDLStore.
>
>
>
> Thanks
>
> Kumiko
>
>
>
> *From:* Bryan Bende [mailto:bbe...@gmail.com <bbe...@gmail.com>]
> *Sent:* Friday, June 3, 2016 12:57 PM
> *To:* users@nifi.apache.org
> *Subject:* Re: Custom processor is failing for the custom processor
>
>
>
> Hello,
>
>
>
> Would you be able to share your code for PutFileAzureDLStore so we can
> help identify if there is a concurrency problem?
>
>
>
> -Bryan
>
>
>
> On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote:
>
> Hello,
>
>
>
> I wrote the following custom service control and processor.  When the
> custom processor is running concurrently, it’s failing often with several
> different errors.  Are there any special handlings for concurrently that I
> need to add in the custom processor?  I wrote the sample Java program which
> does the same thing as the custom processor (authenticate every time the
> file is created/create a file, create 2 threads and run concurrently), it’s
> working fine.  The custom processor also fine when this is not running
> concurrently.
>
>
>
> *Custom service control – set the properties for the Microsoft Azure
> Datalake Store*
>
> *Custom processor – authenticate, then create a file in Microsoft Azure
> Datalake Store*
>
>
>
> Error1:
>
> 2016-06-03 12:29:31,942 INFO [pool-2815-thread-1]
> c.m.aad.adal4j.AuthenticationAuthority [Correlation ID:
> 64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
>
> 2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10]
> n.a.d.processors.PutFileAzureDLStore
>
> java.util.ConcurrentModificationException: null
>
>                 at
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[na:1.8.0_77]
>
>                 at java.util.HashMap$EntryIterator.next(HashMap.java:1463)
> ~[na:1.8.0_77]
>
>                 at java.util.HashMap$EntryIterator.next(HashMap.java:1461)
> ~[na:1.8.0_77]
>
>                 at
> com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
>
>                 at retrofit2.RequestFactory.create(RequestFactory.java:50)
> ~[na:na]
>
>                 at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181)
> ~[na:na]
>
>                 at retrofit2.OkHttpCall.execute(OkHttpCall.java:165)
> ~[na:na]
>
>                 at
> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
> ~[na:na]
>
>                 at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
> ~[na:na]
>
>                 at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
> ~[na:na]
>
>                 at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_77]
>
>                 at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
>
>
>
> Error2:
>
> 2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5]
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
> PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to
> uncaught Exception: com.microsoft.rest.ServiceException: Invalid status
> code 403
>
> 2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5]
> o.a.n.c.t.ContinuallyRunProcessorTask
>
> com.microsoft.rest.ServiceException: Invalid status code 403
>
>                 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> ~[na:1.8.0_77]
>
>                 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> ~[na:1.8.0_77]
>
>                 at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[na:1.8.0_77]
>
>                 at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[na:1.8.0_77]
>
>                 at
> com.microsoft.rest.ServiceResponseBuilder.build(ServiceResponseBuilder.java:147)
> ~[na:na]
>
>                 at
> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.createDelegate(FileSystemOperationsImpl.java:1491)
> ~[na:na]
>
>                 at
> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
> ~[na:na]
>
>                 at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
> ~[na:na]
>
>                 at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
> ~[na:na]
>
>                 at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
> ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
>                 at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_77]
>
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_77]
>
>
>
> Thanks
>
> Kumiko
>
>
>
>
>

Reply via email to