Yes I think it ends up being the same thing.

If you create multiple threads that all use the
same DataLakeStoreFileSystemManagementClient,
or if you increase concurrent threads > 1 in the UI, both will potentially
run into the problem in the Microsoft SDK.

On Thu, Jun 9, 2016 at 2:49 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote:

> Hi Bryan,
>
> Does this mean that even I create the multiple threads in onTriger, I will
> still hit the Microsoft SDK issue where it's not a thread safe?  Sounds
> like basically what I am trying to do and creating the  multiple threads
> via UI might be the same thing.
>
> Thanks
> Kumiko
> ------------------------------
> *From:* Bryan Bende <bbe...@gmail.com>
> *Sent:* Thursday, June 9, 2016 11:26:10 AM
> *To:* users@nifi.apache.org
> *Cc:* Kevin Verhoeven; Ki Kang
>
> *Subject:* Re: Custom processor is failing for concurrency
>
> Kumiko,
>
> In general you shouldn't have to create threads in your processors, with
> the exception of some special cases.
> The framework has a thread pool and it takes one of those threads and
> calls the onTrigger method of your processor.
>
> If you want multiple threads to call onTrigger, then each processor has a
> Concurrent Tasks property in the UI on the scheduling tab,
> which equates to the number of threads that will concurrently call
> onTrigger.
>
> A processor developer needs to only worry about the business logic in the
> onTrigger method, and needs to ensure
> thread-safe access to any member variables or state stored in the
> processor.
>
> Hope that helps.
>
> -Bryan
>
>
> On Thu, Jun 9, 2016 at 2:11 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote:
>
>> Microsoft found this is an issue with the SDK, they are working on a fix,
>> they do not have the ETA for the fix.  To workaround this issue, I’m trying
>> to create the multiple threads in using AbstractSessionFactoryProcessor and
>> handle the Create a file in a single thread.   I’m having a problem that
>> the single thread is not working correctly.  The processor is still acting
>> like a single thread.
>>
>>
>>
>> When I create a thread to handle the create a file, do I have to call
>> this method using java.util.concurrent.ExecutorService?
>>
>>
>>
>> Are there any sample processors that I can take a look?
>>
>>
>>
>> Thanks
>>
>> Kumiko
>>
>>
>>
>> *From:* Kumiko Yada [mailto:kumiko.y...@ds-iq.com]
>> *Sent:* Sunday, June 5, 2016 6:28 PM
>> *To:* users@nifi.apache.org
>> *Cc:* Ki Kang <ki.k...@ds-iq.com>; Kevin Verhoeven <
>> kevin.verhoe...@ds-iq.com>
>> *Subject:* RE: Custom processor is failing for concurrency
>>
>>
>>
>> Thank you, Bryan.  I’m working with Microsoft on this issue.  Will keep
>> you guys updated.
>>
>>
>>
>> Thanks
>>
>> Kumiko
>>
>>
>>
>> *From:* Bryan Bende [mailto:bbe...@gmail.com <bbe...@gmail.com>]
>> *Sent:* Friday, June 3, 2016 2:32 PM
>> *To:* users@nifi.apache.org
>> *Subject:* Re: Custom processor is failing for concurrency
>>
>>
>>
>> It is hard to say for sure, but I think your NiFi processor is generally
>> ok regarding thread safety, but I think there could be a problem in the
>> Azure SDK code...
>>
>>
>>
>> RequestFactory has an instance of BaseUrl and every time
>> RequestFactory.create() is called, it calls BaseUrl.url().
>>
>>
>>
>> The implementation of BaseUrl is the following (according to my IntelliJ
>> attaching the sources...):
>>
>>
>>
>> public class AutoRestBaseUrl implements BaseUrl {
>> /** A template based URL with variables wrapped in {}s. */
>> private String template;
>> /** a mapping from {} wrapped variables in the template and their actual
>> values. */
>> private Map<CharSequence, String> mappings;
>>
>> @Override
>> public HttpUrl url() {
>> String url = template;
>> for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {
>> url = url.replace(entry.getKey(), entry.getValue());
>> }
>> mappings.clear();
>> return HttpUrl.parse(url);
>> }
>>
>> /**
>> * Creates an instance of a template based URL.
>> *
>> * @param url the template based URL to use.
>> */
>> public AutoRestBaseUrl(String url) {
>> this.template = url;
>> this.mappings = new HashMap<>();
>> }
>>
>> /**
>> * Sets the value for the {} wrapped variables in the template URL.
>> * @param matcher the {} wrapped variable to replace.
>> * @param value the value to set for the variable.
>> */
>> public void set(CharSequence matcher, String value) {
>> this.mappings.put(matcher, value);
>> }
>> }
>>
>>
>>
>> The exception is coming from the line where it is looping over the
>> entryset:
>>
>>
>>
>> for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {
>>
>>
>>
>> Right after that loop it calls mappings.clear() so if the RequestFactory
>> is shared by multiple threads (which I think it is), then one thread could
>> be iterating over the set, which another calls mappings.clear().
>>
>>
>>
>>
>>
>> On Fri, Jun 3, 2016 at 5:09 PM, Oleg Zhurakousky <
>> ozhurakou...@hortonworks.com> wrote:
>>
>> Kumiko
>>
>>
>>
>> It appears that the current state of the source you linked in is not in
>> sync with what is in the stack trace. Perhaps you have made some code
>> modifications (e.g., line 218 is an empty line in code while it has a
>> pointer in the star trace).
>>
>> In any event, from what I can see the error is coming from Azure
>> libraries (not NiFi). Specifically
>> ‘com.microsoft.rest.AutoRestBaseUrl.url(..)’ seems to be doing some
>> iteration where I presume the remove is called. Perhaps it is not a thread
>> safe class after all. What does Microsoft documentation says? Have you
>> looked at the source to see what’s going on there? If its open please link
>> and we can tale a look.
>>
>>
>>
>> Cheers
>>
>> Oleg
>>
>>
>>
>> On Jun 3, 2016, at 4:58 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote:
>>
>>
>>
>> Here is the code, https://github.com/kyada1/PutFileAzureDLStore.
>>
>>
>>
>> Thanks
>>
>> Kumiko
>>
>>
>>
>> *From:* Bryan Bende [mailto:bbe...@gmail.com <bbe...@gmail.com>]
>> *Sent:* Friday, June 3, 2016 12:57 PM
>> *To:* users@nifi.apache.org
>> *Subject:* Re: Custom processor is failing for the custom processor
>>
>>
>>
>> Hello,
>>
>>
>>
>> Would you be able to share your code for PutFileAzureDLStore so we can
>> help identify if there is a concurrency problem?
>>
>>
>>
>> -Bryan
>>
>>
>>
>> On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada <kumiko.y...@ds-iq.com>
>> wrote:
>>
>> Hello,
>>
>>
>>
>> I wrote the following custom service control and processor.  When the
>> custom processor is running concurrently, it’s failing often with several
>> different errors.  Are there any special handlings for concurrently that I
>> need to add in the custom processor?  I wrote the sample Java program which
>> does the same thing as the custom processor (authenticate every time the
>> file is created/create a file, create 2 threads and run concurrently), it’s
>> working fine.  The custom processor also fine when this is not running
>> concurrently.
>>
>>
>>
>> *Custom service control – set the properties for the Microsoft Azure
>> Datalake Store*
>>
>> *Custom processor – authenticate, then create a file in Microsoft Azure
>> Datalake Store*
>>
>>
>>
>> Error1:
>>
>> 2016-06-03 12:29:31,942 INFO [pool-2815-thread-1]
>> c.m.aad.adal4j.AuthenticationAuthority [Correlation ID:
>> 64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
>>
>> 2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10]
>> n.a.d.processors.PutFileAzureDLStore
>>
>> java.util.ConcurrentModificationException: null
>>
>>                 at
>> java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[na:1.8.0_77]
>>
>>                 at
>> java.util.HashMap$EntryIterator.next(HashMap.java:1463) ~[na:1.8.0_77]
>>
>>                 at
>> java.util.HashMap$EntryIterator.next(HashMap.java:1461) ~[na:1.8.0_77]
>>
>>                 at
>> com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
>>
>>                 at
>> retrofit2.RequestFactory.create(RequestFactory.java:50) ~[na:na]
>>
>>                 at
>> retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181) ~[na:na]
>>
>>                 at retrofit2.OkHttpCall.execute(OkHttpCall.java:165)
>> ~[na:na]
>>
>>                 at
>> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
>> ~[na:na]
>>
>>                 at
>> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
>> ~[na:na]
>>
>>                 at
>> nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
>> ~[na:na]
>>
>>                 at
>> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> [na:1.8.0_77]
>>
>>                 at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
>>
>>
>>
>> Error2:
>>
>> 2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5]
>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>> PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to
>> uncaught Exception: com.microsoft.rest.ServiceException: Invalid status
>> code 403
>>
>> 2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5]
>> o.a.n.c.t.ContinuallyRunProcessorTask
>>
>> com.microsoft.rest.ServiceException: Invalid status code 403
>>
>>                 at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> ~[na:1.8.0_77]
>>
>>                 at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> ~[na:1.8.0_77]
>>
>>                 at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> ~[na:1.8.0_77]
>>
>>                 at
>> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> ~[na:1.8.0_77]
>>
>>                 at
>> com.microsoft.rest.ServiceResponseBuilder.build(ServiceResponseBuilder.java:147)
>> ~[na:na]
>>
>>                 at
>> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.createDelegate(FileSystemOperationsImpl.java:1491)
>> ~[na:na]
>>
>>                 at
>> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
>> ~[na:na]
>>
>>                 at
>> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
>> ~[na:na]
>>
>>                 at
>> nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
>> ~[na:na]
>>
>>                 at
>> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
>> ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>
>>                 at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> [na:1.8.0_77]
>>
>>                 at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> [na:1.8.0_77]
>>
>>
>>
>> Thanks
>>
>> Kumiko
>>
>>
>>
>>
>>
>
>

Reply via email to