Sally, Although it may be possible to do some multithreading inside ExecuteScript, it is not really designed for this kind of thing; rather it is usually for short code blocks to perform some transformation or parse some format that existing NiFi processors do not (yet) handle. In briefly looking at your code, might I suggest one of the following options:
1) It looks like you are reading in an XML file and performing some transformation upon it. You can do this in NiFi with ListFile -> FetchFile -> TransformXml if you are comfortable with XSLT to do the transformation. You could still do the transformation logic in ExecuteScript, but you certainly do not need to read in the files from the script, NiFi has plenty of support for that. 2) It looks like you are trying to do two separate things at the same time above, I'm not sure whey they have to be concurrent. The first thread appears to just read in the file and write it out as a flow file. ListFile->FetchFile can do that, and you can connect the "success" relationship to any number of downstream processors. The second one also reads in the file and does some XML transformation, please see #1 for suggestions on that. Also I'm not sure why you'd use a RandomAccessFile for XML content... 3) InvokeScriptedProcessor is better equipped to handle more complex tasks, lifecycle, etc. than ExecuteScript. 4) For truly custom (and non-trivial) logic, consider writing your own full-fledged processor. You can write it in Groovy, and there is a Maven archetype for creating a bundle with a module for the NiFi ARchive (NAR) and a module for the processors/components. Regards, Matt On Tue, Dec 19, 2017 at 7:45 AM, sally <[email protected]> wrote: > I want to use natural threads for read - write operation in config file , i > have used threads implemented in groovy, but my code neither throws > exception nor makes anything else , here are several subjects i am > interested in: > > Is it possible to implement natural Threads inside nifi executeScript > processor groovy code? > How should i implement this logic inside nifi environment without threads? > > here is my code example, WHAT SHOULD I CHANGE TO MAKE THIS CODE WORK? > > static String content = ""; > static File file = new File("C:/Users/Desktop/test/conf.xml"); > static BufferedReader s; > static BufferedWriter w; > static RandomAccessFile ini= new RandomAccessFile(file, "rwd"); > static FileLock lock= ini.getChannel().lock(); > def static thread=Thread.start{ > try { > String sCurrentLine; > s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8")); > while ((sCurrentLine = s.readLine()) != null) { > content += sCurrentLine; > } > > ini.seek(0); > def flowFile1 = session.create() > flowFile1 = session.putAttribute(flowFile1, "filename", > "conf.xml"); > session.write(flowFile1, new StreamCallback() { > @Override > public void process(InputStream inputStream1, OutputStream > outputStream) throws IOException { > > > outputStream.write(content.getBytes(StandardCharsets.UTF_8)) > } > > }); > session.transfer(flowFile1, REL_SUCCESS); > > def xml = new XmlParser().parseText(content); > xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody 'false'}; > def newxml1 = XmlUtil.serialize(xml); > String data = newxml1; > if (!data.isEmpty()) { > ini.setLength(0); > w = new BufferedWriter(Channels.newWriter(ini.getChannel(), > "UTF-8")); > w.write(data); > lock.release(); > w.close(); > > } > > > }catch (FileNotFoundException e) { > TimeUnit.SECONDS.sleep(50000); > e.printStackTrace(); > } catch (IOException e) { > e.printStackTrace(); > > } catch(OverlappingFileLockException e){ > TimeUnit.SECONDS.sleep(50000); > lock.release(); > e.printStackTrace(); > } catch (Exception e) { > e.printStackTrace(); > } finally { > > lock.release() > thread.stop(); > } > > > } > > def static thread2=Thread.start{ > try { > > String sCurrentLine; > s = new BufferedReader(Channels.newReader(ini.getChannel(), > "UTF-8")); > while ((sCurrentLine = s.readLine()) != null) { > content += sCurrentLine; > } > ini.seek(0); > > def flowFile = session.get(); > if (flowFile != null) return; > def serviceName = flowFile.getAttribute('serviceName'); > def date = flowFile.getAttribute('filename').substring(0, 10); > def xml = new XmlParser().parseText(content) > if (serviceName == 'borderCrossDecl') { > > xml.RS.borderCrossDecl.details.findAll({ p -> > p.runAs[0].text() == "false" && p.start[0].text() == > date.toString(); > }).each({ p -> > p.start[0].value = addDays(p.start[0].text()) > p.runAs[0].value = "true" > }) > } > def xml1 = new XmlParser().parseText(content); > def newXml = XmlUtil.serialize(xml1) > String data = newXml.toString(); > if (!data.isEmpty()) { > ini.setLength(0); > w = new BufferedWriter(Channels.newWriter(ini.getChannel(), > "UTF-8")); > w.write(data); > lock.release(); > w.close(); > > } > }catch (FileNotFoundException e) { > TimeUnit.SECONDS.sleep(50000); > e.printStackTrace(); > } catch (IOException e) { > e.printStackTrace(); > > } catch(OverlappingFileLockException e){ > TimeUnit.SECONDS.sleep(50000); > lock.release(); > e.printStackTrace(); > } catch (Exception e) { > e.printStackTrace(); > } finally { > lock.release(); > thread2.stop(); > } > > }; > > thread.join(); > thread2.join(); > > > > -- > Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/
