Yes I think it ends up being the same thing. If you create multiple threads that all use the same DataLakeStoreFileSystemManagementClient, or if you increase concurrent threads > 1 in the UI, both will potentially run into the problem in the Microsoft SDK.
On Thu, Jun 9, 2016 at 2:49 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote: > Hi Bryan, > > Does this mean that even I create the multiple threads in onTriger, I will > still hit the Microsoft SDK issue where it's not a thread safe? Sounds > like basically what I am trying to do and creating the multiple threads > via UI might be the same thing. > > Thanks > Kumiko > ------------------------------ > *From:* Bryan Bende <bbe...@gmail.com> > *Sent:* Thursday, June 9, 2016 11:26:10 AM > *To:* users@nifi.apache.org > *Cc:* Kevin Verhoeven; Ki Kang > > *Subject:* Re: Custom processor is failing for concurrency > > Kumiko, > > In general you shouldn't have to create threads in your processors, with > the exception of some special cases. > The framework has a thread pool and it takes one of those threads and > calls the onTrigger method of your processor. > > If you want multiple threads to call onTrigger, then each processor has a > Concurrent Tasks property in the UI on the scheduling tab, > which equates to the number of threads that will concurrently call > onTrigger. > > A processor developer needs to only worry about the business logic in the > onTrigger method, and needs to ensure > thread-safe access to any member variables or state stored in the > processor. > > Hope that helps. > > -Bryan > > > On Thu, Jun 9, 2016 at 2:11 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote: > >> Microsoft found this is an issue with the SDK, they are working on a fix, >> they do not have the ETA for the fix. To workaround this issue, I’m trying >> to create the multiple threads in using AbstractSessionFactoryProcessor and >> handle the Create a file in a single thread. I’m having a problem that >> the single thread is not working correctly. The processor is still acting >> like a single thread. >> >> >> >> When I create a thread to handle the create a file, do I have to call >> this method using java.util.concurrent.ExecutorService? >> >> >> >> Are there any sample processors that I can take a look? >> >> >> >> Thanks >> >> Kumiko >> >> >> >> *From:* Kumiko Yada [mailto:kumiko.y...@ds-iq.com] >> *Sent:* Sunday, June 5, 2016 6:28 PM >> *To:* users@nifi.apache.org >> *Cc:* Ki Kang <ki.k...@ds-iq.com>; Kevin Verhoeven < >> kevin.verhoe...@ds-iq.com> >> *Subject:* RE: Custom processor is failing for concurrency >> >> >> >> Thank you, Bryan. I’m working with Microsoft on this issue. Will keep >> you guys updated. >> >> >> >> Thanks >> >> Kumiko >> >> >> >> *From:* Bryan Bende [mailto:bbe...@gmail.com <bbe...@gmail.com>] >> *Sent:* Friday, June 3, 2016 2:32 PM >> *To:* users@nifi.apache.org >> *Subject:* Re: Custom processor is failing for concurrency >> >> >> >> It is hard to say for sure, but I think your NiFi processor is generally >> ok regarding thread safety, but I think there could be a problem in the >> Azure SDK code... >> >> >> >> RequestFactory has an instance of BaseUrl and every time >> RequestFactory.create() is called, it calls BaseUrl.url(). >> >> >> >> The implementation of BaseUrl is the following (according to my IntelliJ >> attaching the sources...): >> >> >> >> public class AutoRestBaseUrl implements BaseUrl { >> /** A template based URL with variables wrapped in {}s. */ >> private String template; >> /** a mapping from {} wrapped variables in the template and their actual >> values. */ >> private Map<CharSequence, String> mappings; >> >> @Override >> public HttpUrl url() { >> String url = template; >> for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) { >> url = url.replace(entry.getKey(), entry.getValue()); >> } >> mappings.clear(); >> return HttpUrl.parse(url); >> } >> >> /** >> * Creates an instance of a template based URL. >> * >> * @param url the template based URL to use. >> */ >> public AutoRestBaseUrl(String url) { >> this.template = url; >> this.mappings = new HashMap<>(); >> } >> >> /** >> * Sets the value for the {} wrapped variables in the template URL. >> * @param matcher the {} wrapped variable to replace. >> * @param value the value to set for the variable. >> */ >> public void set(CharSequence matcher, String value) { >> this.mappings.put(matcher, value); >> } >> } >> >> >> >> The exception is coming from the line where it is looping over the >> entryset: >> >> >> >> for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) { >> >> >> >> Right after that loop it calls mappings.clear() so if the RequestFactory >> is shared by multiple threads (which I think it is), then one thread could >> be iterating over the set, which another calls mappings.clear(). >> >> >> >> >> >> On Fri, Jun 3, 2016 at 5:09 PM, Oleg Zhurakousky < >> ozhurakou...@hortonworks.com> wrote: >> >> Kumiko >> >> >> >> It appears that the current state of the source you linked in is not in >> sync with what is in the stack trace. Perhaps you have made some code >> modifications (e.g., line 218 is an empty line in code while it has a >> pointer in the star trace). >> >> In any event, from what I can see the error is coming from Azure >> libraries (not NiFi). Specifically >> ‘com.microsoft.rest.AutoRestBaseUrl.url(..)’ seems to be doing some >> iteration where I presume the remove is called. Perhaps it is not a thread >> safe class after all. What does Microsoft documentation says? Have you >> looked at the source to see what’s going on there? If its open please link >> and we can tale a look. >> >> >> >> Cheers >> >> Oleg >> >> >> >> On Jun 3, 2016, at 4:58 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote: >> >> >> >> Here is the code, https://github.com/kyada1/PutFileAzureDLStore. >> >> >> >> Thanks >> >> Kumiko >> >> >> >> *From:* Bryan Bende [mailto:bbe...@gmail.com <bbe...@gmail.com>] >> *Sent:* Friday, June 3, 2016 12:57 PM >> *To:* users@nifi.apache.org >> *Subject:* Re: Custom processor is failing for the custom processor >> >> >> >> Hello, >> >> >> >> Would you be able to share your code for PutFileAzureDLStore so we can >> help identify if there is a concurrency problem? >> >> >> >> -Bryan >> >> >> >> On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada <kumiko.y...@ds-iq.com> >> wrote: >> >> Hello, >> >> >> >> I wrote the following custom service control and processor. When the >> custom processor is running concurrently, it’s failing often with several >> different errors. Are there any special handlings for concurrently that I >> need to add in the custom processor? I wrote the sample Java program which >> does the same thing as the custom processor (authenticate every time the >> file is created/create a file, create 2 threads and run concurrently), it’s >> working fine. The custom processor also fine when this is not running >> concurrently. >> >> >> >> *Custom service control – set the properties for the Microsoft Azure >> Datalake Store* >> >> *Custom processor – authenticate, then create a file in Microsoft Azure >> Datalake Store* >> >> >> >> Error1: >> >> 2016-06-03 12:29:31,942 INFO [pool-2815-thread-1] >> c.m.aad.adal4j.AuthenticationAuthority [Correlation ID: >> 64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful >> >> 2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10] >> n.a.d.processors.PutFileAzureDLStore >> >> java.util.ConcurrentModificationException: null >> >> at >> java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[na:1.8.0_77] >> >> at >> java.util.HashMap$EntryIterator.next(HashMap.java:1463) ~[na:1.8.0_77] >> >> at >> java.util.HashMap$EntryIterator.next(HashMap.java:1461) ~[na:1.8.0_77] >> >> at >> com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na] >> >> at >> retrofit2.RequestFactory.create(RequestFactory.java:50) ~[na:na] >> >> at >> retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181) ~[na:na] >> >> at retrofit2.OkHttpCall.execute(OkHttpCall.java:165) >> ~[na:na] >> >> at >> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432) >> ~[na:na] >> >> at >> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252) >> ~[na:na] >> >> at >> nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218) >> ~[na:na] >> >> at >> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) >> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> [na:1.8.0_77] >> >> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77] >> >> >> >> Error2: >> >> 2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5] >> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding >> PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to >> uncaught Exception: com.microsoft.rest.ServiceException: Invalid status >> code 403 >> >> 2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5] >> o.a.n.c.t.ContinuallyRunProcessorTask >> >> com.microsoft.rest.ServiceException: Invalid status code 403 >> >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> ~[na:1.8.0_77] >> >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> ~[na:1.8.0_77] >> >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> ~[na:1.8.0_77] >> >> at >> java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> ~[na:1.8.0_77] >> >> at >> com.microsoft.rest.ServiceResponseBuilder.build(ServiceResponseBuilder.java:147) >> ~[na:na] >> >> at >> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.createDelegate(FileSystemOperationsImpl.java:1491) >> ~[na:na] >> >> at >> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432) >> ~[na:na] >> >> at >> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252) >> ~[na:na] >> >> at >> nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218) >> ~[na:na] >> >> at >> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) >> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057) >> ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) >> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT] >> >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> [na:1.8.0_77] >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> [na:1.8.0_77] >> >> >> >> Thanks >> >> Kumiko >> >> >> >> >> > >