Here is the code:
https://github.com/kyada1/PutFileAzureDLStore/blob/master/nifi-azure-dlstore-bundle/nifi-azure-dlstore-processors/src/main/java/nifi/azure/dlstore/processors/PutFileAzureDLStore.java
The problems are :
Line 209 - SetupClients(creds);
Line 217 - CreateFile(_path + _filename, value.get(), true);
To wrap the call in synchronized method, are these looked correct? Or for the
line 217, does the method have to return something?
For the line 209:
synchronized (this) {
SetupClients(creds);
if (creds!= null) {
return creds;
}
For the line 217:
synchronized {
CreateFile(_path + _filename, value.get(), true);
}
Thanks
Kumiko
From: Matt Foley [mailto:[email protected]]
Sent: Thursday, June 9, 2016 12:27 PM
To: [email protected]
Cc: Kevin Verhoeven <[email protected]>; Ki Kang <[email protected]>
Subject: Re: Custom processor is failing for concurrency
If there are multiple SDK calls that share the same problematic code (that is,
multiple SDK methods that would interact with each other in a non-thread-safe
way), then one must synchronize their calls to a shared lock object, which only
requires a couple more lines of code.
________________________________
From: Oleg Zhurakousky
<[email protected]<mailto:[email protected]>>
Sent: Thursday, June 09, 2016 12:15 PM
To: [email protected]<mailto:[email protected]>
Cc: Kevin Verhoeven; Ki Kang
Subject: Re: Custom processor is failing for concurrency
+1, was just responding with the same.
On Jun 9, 2016, at 3:11 PM, Matt Foley
<[email protected]<mailto:[email protected]>> wrote:
Kumiko, would it be sufficient to just wrap your call to the non-thread-safe MS
SDK routine, in a 'synchronized' method? You could then use the standard NiFi
thread management and avoid a lot of complexity. And the result should be >=
efficiency of having a dedicated thread to manage the problem action.
--Matt F
________________________________
From: Kumiko Yada <[email protected]<mailto:[email protected]>>
Sent: Thursday, June 09, 2016 11:49 AM
To: [email protected]<mailto:[email protected]>
Cc: Kevin Verhoeven; Ki Kang
Subject: Re: Custom processor is failing for concurrency
Hi Bryan,
Does this mean that even I create the multiple threads in onTriger, I will
still hit the Microsoft SDK issue where it's not a thread safe? Sounds like
basically what I am trying to do and creating the multiple threads via UI
might be the same thing.
Thanks
Kumiko
________________________________
From: Bryan Bende <[email protected]<mailto:[email protected]>>
Sent: Thursday, June 9, 2016 11:26:10 AM
To: [email protected]<mailto:[email protected]>
Cc: Kevin Verhoeven; Ki Kang
Subject: Re: Custom processor is failing for concurrency
Kumiko,
In general you shouldn't have to create threads in your processors, with the
exception of some special cases.
The framework has a thread pool and it takes one of those threads and calls the
onTrigger method of your processor.
If you want multiple threads to call onTrigger, then each processor has a
Concurrent Tasks property in the UI on the scheduling tab,
which equates to the number of threads that will concurrently call onTrigger.
A processor developer needs to only worry about the business logic in the
onTrigger method, and needs to ensure
thread-safe access to any member variables or state stored in the processor.
Hope that helps.
-Bryan
On Thu, Jun 9, 2016 at 2:11 PM, Kumiko Yada
<[email protected]<mailto:[email protected]>> wrote:
Microsoft found this is an issue with the SDK, they are working on a fix, they
do not have the ETA for the fix. To workaround this issue, I’m trying to
create the multiple threads in using AbstractSessionFactoryProcessor and handle
the Create a file in a single thread. I’m having a problem that the single
thread is not working correctly. The processor is still acting like a single
thread.
When I create a thread to handle the create a file, do I have to call this
method using java.util.concurrent.ExecutorService?
Are there any sample processors that I can take a look?
Thanks
Kumiko
From: Kumiko Yada [mailto:[email protected]<mailto:[email protected]>]
Sent: Sunday, June 5, 2016 6:28 PM
To: [email protected]<mailto:[email protected]>
Cc: Ki Kang <[email protected]<mailto:[email protected]>>; Kevin Verhoeven
<[email protected]<mailto:[email protected]>>
Subject: RE: Custom processor is failing for concurrency
Thank you, Bryan. I’m working with Microsoft on this issue. Will keep you
guys updated.
Thanks
Kumiko
From: Bryan Bende [mailto:[email protected]]
Sent: Friday, June 3, 2016 2:32 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Custom processor is failing for concurrency
It is hard to say for sure, but I think your NiFi processor is generally ok
regarding thread safety, but I think there could be a problem in the Azure SDK
code...
RequestFactory has an instance of BaseUrl and every time
RequestFactory.create() is called, it calls BaseUrl.url().
The implementation of BaseUrl is the following (according to my IntelliJ
attaching the sources...):
public class AutoRestBaseUrl implements BaseUrl {
/** A template based URL with variables wrapped in {}s. */
private String template;
/** a mapping from {} wrapped variables in the template and their actual
values. */
private Map<CharSequence, String> mappings;
@Override
public HttpUrl url() {
String url = template;
for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {
url = url.replace(entry.getKey(), entry.getValue());
}
mappings.clear();
return HttpUrl.parse(url);
}
/**
* Creates an instance of a template based URL.
*
* @param url the template based URL to use.
*/
public AutoRestBaseUrl(String url) {
this.template = url;
this.mappings = new HashMap<>();
}
/**
* Sets the value for the {} wrapped variables in the template URL.
* @param matcher the {} wrapped variable to replace.
* @param value the value to set for the variable.
*/
public void set(CharSequence matcher, String value) {
this.mappings.put(matcher, value);
}
}
The exception is coming from the line where it is looping over the entryset:
for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {
Right after that loop it calls mappings.clear() so if the RequestFactory is
shared by multiple threads (which I think it is), then one thread could be
iterating over the set, which another calls mappings.clear().
On Fri, Jun 3, 2016 at 5:09 PM, Oleg Zhurakousky
<[email protected]<mailto:[email protected]>> wrote:
Kumiko
It appears that the current state of the source you linked in is not in sync
with what is in the stack trace. Perhaps you have made some code modifications
(e.g., line 218 is an empty line in code while it has a pointer in the star
trace).
In any event, from what I can see the error is coming from Azure libraries (not
NiFi). Specifically ‘com.microsoft.rest.AutoRestBaseUrl.url(..)’ seems to be
doing some iteration where I presume the remove is called. Perhaps it is not a
thread safe class after all. What does Microsoft documentation says? Have you
looked at the source to see what’s going on there? If its open please link and
we can tale a look.
Cheers
Oleg
On Jun 3, 2016, at 4:58 PM, Kumiko Yada
<[email protected]<mailto:[email protected]>> wrote:
Here is the code, https://github.com/kyada1/PutFileAzureDLStore.
Thanks
Kumiko
From: Bryan Bende [mailto:[email protected]]
Sent: Friday, June 3, 2016 12:57 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Custom processor is failing for the custom processor
Hello,
Would you be able to share your code for PutFileAzureDLStore so we can help
identify if there is a concurrency problem?
-Bryan
On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada
<[email protected]<mailto:[email protected]>> wrote:
Hello,
I wrote the following custom service control and processor. When the custom
processor is running concurrently, it’s failing often with several different
errors. Are there any special handlings for concurrently that I need to add in
the custom processor? I wrote the sample Java program which does the same
thing as the custom processor (authenticate every time the file is
created/create a file, create 2 threads and run concurrently), it’s working
fine. The custom processor also fine when this is not running concurrently.
Custom service control – set the properties for the Microsoft Azure Datalake
Store
Custom processor – authenticate, then create a file in Microsoft Azure Datalake
Store
Error1:
2016-06-03 12:29:31,942 INFO [pool-2815-thread-1]
c.m.aad.adal4j.AuthenticationAuthority [Correlation ID:
64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10]
n.a.d.processors.PutFileAzureDLStore
java.util.ConcurrentModificationException: null
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
~[na:1.8.0_77]
at java.util.HashMap$EntryIterator.next(HashMap.java:1463)
~[na:1.8.0_77]
at java.util.HashMap$EntryIterator.next(HashMap.java:1461)
~[na:1.8.0_77]
at
com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
at retrofit2.RequestFactory.create(RequestFactory.java:50)
~[na:na]
at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181)
~[na:na]
at retrofit2.OkHttpCall.execute(OkHttpCall.java:165) ~[na:na]
at
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
~[na:na]
at
nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
~[na:na]
at
nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
~[na:na]
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_77]
at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_77]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_77]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_77]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
Error2:
2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5]
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to uncaught
Exception: com.microsoft.rest.ServiceException: Invalid status code 403
2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5]
o.a.n.c.t.ContinuallyRunProcessorTask
com.microsoft.rest.ServiceException: Invalid status code 403
at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
~[na:1.8.0_77]
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
~[na:1.8.0_77]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[na:1.8.0_77]
at
java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_77]
at
com.microsoft.rest.ServiceResponseBuilder.build(ServiceResponseBuilder.java:147)
~[na:na]
at
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.createDelegate(FileSystemOperationsImpl.java:1491)
~[na:na]
at
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
~[na:na]
at
nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
~[na:na]
at
nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
~[na:na]
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_77]
at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_77]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_77]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_77]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_77]
Thanks
Kumiko