Giovanni,
I second all of Andy's answers, they are spot-on. For the each()
construct, they are "safe" in the sense that you will be working with
one flow file at a time, but remember that there is only one
"session". If you throw an Exception from inside the each(), then it
will be caught by ExecuteScript (if not caught by your script), and
the entire session will be rolled back. You are probably better off
with the approach you outlined where you wrap the logic in the
try/catch and route to success/failure accordingly... unless an error
indicates a "retry all", then a rollback is likely what you want.
For the ScriptTester, I haven't yet added support for setting
attributes on incoming flow file(s), I am trying to think of a clean
way to allow them for arbitrary flow files such as when the --input
switch is specified. Suggestions are welcome :) For the first
go-round
I might allow something such that attributes would be added to all
flow files, or at least for one coming in via STDIN.
For the single fat/shaded JAR, you can certainly do things that way,
but if you are using Groovy, Clojure, or Javascript/Nashorn, you can
put all the JARs in a single directory (not nested!) and just add the
directory to your Module Directory property. That might save you a
build/package step. Doesn't help with reloading though.
For the on-the-fly reload of an updated fat JAR, you could (at the
expense of performance) have the script load the JAR. At that point
you'd probably be better served with InvokeScriptedProcessor so you
could add a FileWatcher at startup, and reload the JAR from a
separate
thread when changes are detected. In either case I believe you'd be
looking at creating a URLClassLoader with your fat JAR as the only
URL, and the current ClassLoader as its parent. Then you can set the
Thread's context classloader to the new one, and/or you may need to
do
some more classloading voodoo.
Not sure if I covered all your questions/comments, but if not please
let me know and I will try again :)
Regards,
Matt
On Wed, Oct 4, 2017 at 3:18 AM, Giovanni Lanzani
<[email protected]> wrote:
Hi Andy,
That's very helpful, thanks! Inline my comments, waiting for Matt to
come
home :)
On 3 Oct 2017, at 22:44, Andy LoPresto wrote:
Giovanni,
A lot of great questions here. I’ll try to go through them but I
hope Matt
weighs in as well (he is on vacation for the next few days though).
* The only time I am aware the Jars are reloaded is at processor
restart (I
believe this is the same for the script content if defined by a
referenced
file as well). The scriptingComponentHelper setup*() methods execute
inside
ExecuteScript#setup(), which has @OnScheduled annotation [1].
Is there anyone that has written sort of script (I don't know if it
is
possible) to query the NiFi API for all the (Groovy ExecuteScript)
processors using a particular module directory (we plan to use a
single one
for everything), so that I could add a new step, after the shadowJar
deployment, that restarts all of them?
I imagine this would be a fairly common use case. We're I'm
currently
working we have the following workflow:
Have a single jar with all the code that the groovy scripts will
need;
The groovy scripts will use that code with minimal boilerplate
around it, so
all the (non-NiFi) related code is in the jar. This makes it very
easy to
test the logic in the jar. We added some extra code to ensure the
functions
that the groovy scripts will call are "NiFi compatible" (right now
it's just
.getBytes(StandardCharsets.UTF_8)) We don't use Matt framework
because we
need incoming flowFile to have attributes, and I couldn't figure out
how to
do it :)
NiFi has a flow to fetch new master updates on the repo and compile
the
(fat) jar as a result. However we would need to restart the
ExecuteScript
processors by hand and... no/no? :) A script would help greatly here
(if
nobody has one, I will dig into the API to see what's possible. I
might just
parse the whole xml file if there's no way to do so via the API;
* I’m not sure how other users bundle their dependencies, but
shadow Jars
would be fine for this use case, and Matt has referenced using them
in his
script-tester article [2].
* Yes, while there are small idiosyncrasies with each language
flavor, the
NiFi-related domain is fairly consistent. In this case, iterating
over a
number of flowfiles for processing in a single Groovy script is
fine.
Session.get(int) [3] is delegated to ProcessSession and returns
List<FlowFile>, so you can use any of the Groovy collections methods
over
it.
So what happens in this case
def n = 0
session.get(N).each{ flowFile ->
if(n ==0) {
//do something
} else {
throw Exception
}
session.transfer(flowFile, REL_SUCCESS)
n += 1
}
Will the first flowFile be successfully transferred or will a
rollback
happen? (Note: I usually wrap the logic in try/catch and then, based
on the
result, transfer the file to REL_SUCCESS/REL_FAILURE
Thanks again,
Giovanni
Hopefully this helps you and if Matt or anyone else sees a mistake,
they
correct it and add their thoughts. Thanks.
[1]
https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#onscheduled
[2]
https://funnifi.blogspot.com/2016/06/testing-executescript-processor-scripts.html
<https://funnifi.blogspot.com/2016/06/testing-executescript-processor-scripts.html>
[3]
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java#L1520
<https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java#L1520>
Andy LoPresto
[email protected]
[email protected]
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69
On Oct 3, 2017, at 1:09 PM, Giovanni Lanzani
<[email protected]> wrote:
I apologize if this is specified elsewhere, but I couldn't find it.
I was wondering when the jars, used by a particular Groovy script
(in the
ExecuteScript processor), are reloaded. I.e. if one jar is updated,
when
will the script pick up the new version? I know that upon restarting
the
processor, the updated jar is considered, but I was wondering in
which other
occasions that happens;
Do people tend to use fat (shadow) jars for this sort of jars
referenced by
groovy scripts? I don't think it makes sense to keep track of all
the
dependencies manually otherwise;
When using the {P,J}ython processor, I read Matt advice to use the
following
construct in the script:
for flowFile in session.get(N):
if flowFile:
# do your thing here
Does the same hold for Groovy, i.e. should someone do
session.get(N).each{ flowFile ->
// do your thing here
if(condition) {
session.transfer(flowFile, REL_SUCCESS)
} else {
session.transfer(flowFile, REL_FAILURE)}
}
Is this approach safe in groovy inside a each? Or is this approach
not
needed at all in Groovy, while it is needed in {P,J}ython?
Thanks in advance!
Giovanni