Re: Do CRON driven and Yield duration work together?
Jim, Yes, they should work together in concert. Thanks -Mark > On Dec 18, 2017, at 12:16 PM, James McMahonwrote: > > Hello. I would like to set up a processor that initiates my workflow as a > CRON driven processor. Furthermore, I must ensure that this processor does > not run for at least an hour from its previous run. > > I would set my processor CRON driven specification like this: > 0 15 * * * ? * > and I think I should set my Yield duration like this > 3601 seconds > > Can I use CRON driven in concert with Yield duration like I described to meet > this requirement? Thanks very much in advance for your help.
Do CRON driven and Yield duration work together?
Hello. I would like to set up a processor that initiates my workflow as a CRON driven processor. Furthermore, I must ensure that this processor does not run for at least an hour from its previous run. I would set my processor CRON driven specification like this: 0 15 * * * ? * and I think I should set my Yield duration like this 3601 seconds Can I use CRON driven in concert with Yield duration like I described to meet this requirement? Thanks very much in advance for your help.
Re: Prioritizing flowFiles to tailor throughput
Hey James, Sorry, no one responded when you first sent the message but I'm curious what you ended up doing and any findings you had. Also, wanted to bring this thread back up to the attention of the larger group as it brings up some interesting questions I haven't found discussed elsewhere. On the topic of the re-sorting of the queue, I was curious about the answer, so I dug down to the StandardFlowFileQueue and found that it's primarily just wrapping an instance of Java's PriorityQueue for its active queue[1]. This means that sorting is done each time a FlowFile is enqueued but also that we have immediate access to the head of the queue. I'm sure someone else (Mark Payne?) could explain better how we make use of the nuances of the queue for better performance and the impacts the different queue prioritizers have. For the higher priority FlowFiles starving out lower priority ones, I'm thinking about a way to give a weight instead of a priority. So in essence, a "weighted funnel processor", which grabs X Flowfiles each time but has a weighting assigned to different categories such that you take a certain number of each category based on a given weight. That said, I'm not sure that would be guaranteed to work when FlowFiles in the queue are swapped out since even if we iterated over everything in the incoming connection, there are still others swapped to disk. Also, there's probably performance concerns if we tried to implement it using the current tools offered to a processor. For the separate NiFis approach, I'm curious what other's view is. Personally, it makes sense to me, that for flows that are dramatically different in priority you'd want to section it off to another instance of NiFi. Essentially the separation between data-plane and control-plane instances of NiFi. Lastly, James, I assume you're limited to using the 0.7.x release for a specific reason? I'd highly suggest upgrading to the latest version whenever possible. There are many security and performance improvements, and of course many new features. [1] https://github.com/apache/nifi/blob/7f4cfd51ea07ead6c9b71b6c6d6f87a352b801d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java#L89 Joe On Thu, Oct 19, 2017 at 8:58 AM, James McMahonwrote: > Our team is considering ways to accelerate delivery for certain subsets of > content we process through NiFi. We are using Apache NiFi 0.7.x as our > baseline. > > This link discusses a recommended approach to content prioritization using > PriorityAttributePrioritizer on a connector (queue) to tailor throughput > based on a priority attribute we set upstream in our flow: > > https://stackoverflow.com/questions/42528993/how-to- > specify-priority-attributes-for-individual-flowfiles > > How often does the connector queue have to re-sort contents in order to > enforce our priority attribute? Is it re-sorting *every *single time new > flowFiles hit the queue? Won't that markedly and negatively impact > performance? > > If our priority 1s are a huge volume of flowfiles that persists over time, > won't this approach cause our priority 2s, 3s, etc etc to languish in queue? > > The described approach seems to embed significant business logic in the > NiFi workflows. In an environment where priorities change often, would that > be considered a poor approach? Might it be better to enforce priority > processing at a higher architectural level - a lightweight NiFi server to > accelerate delivery of priority one content and email alerts, a priority > two suite of NiFi servers for standard flowfile volume, a priority three > suite of servers to handle long-term bulk processing, etc etc? > > Thanks in advance for your help. -Jim > -- *Joe Percivall* linkedin.com/in/Percivall e: jperciv...@apache.com
how to use threads in nifi?
I have one config file inside my nifi environment, in which i should change, check and update values , but i should make this operation in independent threads so that(threads should't flush) here are several things i am interested in: 1. Can i use Synchronized code blocks inside executeScript processor? 2.How can i use Thread locks inside nifi? here is my code but it doesn't work properly what should i change to make my logic work? static String content = ""; static File file = new File("C:/Users/Desktop/test/conf.xml"); static BufferedReader s; static BufferedWriter w; static RandomAccessFile ini= new RandomAccessFile(file, "rwd"); static FileLock lock= ini.getChannel().lock(); public static synchronized void read() { try { String sCurrentLine; s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8")); while ((sCurrentLine = s.readLine()) != null) { content += sCurrentLine; } ini.seek(0); /* def flowFile1 = session.create() flowFile1 = session.putAttribute(flowFile1, "filename", "conf.xml"); session.write(flowFile1, new StreamCallback() { @Override public void process(InputStream inputStream1, OutputStream outputStream) throws IOException { outputStream.write(content.getBytes(StandardCharsets.UTF_8)) } }); session.transfer(flowFile1, REL_SUCCESS);*/ def xml = new XmlParser().parseText(content); //xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody 'false'}; def newxml1 = XmlUtil.serialize(xml); String data = newxml1; if (!data.isEmpty()) { ini.setLength(0); w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8")); w.write(data); lock.release(); w.close(); } println data; }catch (FileNotFoundException e) { TimeUnit.SECONDS.sleep(5); e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch(OverlappingFileLockException e){ TimeUnit.SECONDS.sleep(5); lock.release(); e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { w.close(); ini.close(); } } public static void write() { synchronized (Main1.class) { try { String sCurrentLine; s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8")); while ((sCurrentLine = s.readLine()) != null) { content += sCurrentLine; } // println content; ini.seek(0); /* def flowFile = session.get(); if (flowFile != null) return; def serviceName = flowFile.getAttribute('serviceName'); def date = flowFile.getAttribute('filename').substring(0, 10); def xml = new XmlParser().parseText(content) if (serviceName == 'borderCrossDecl') { xml.RS.borderCrossDecl.details.findAll({ p -> p.runAs[0].text() == "false" && p.start[0].text() == date.toString(); }).each({ p -> p.start[0].value = addDays(p.start[0].text()) p.runAs[0].value = "true" }) }*/ def newXml = groovy.xml.XmlUtil.serialize(content) String data = newXml.toString(); if (!data.isEmpty()) { ini.setLength(0); w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8")); w.write(data); lock.release(); w.close(); } }catch (FileNotFoundException e) { TimeUnit.SECONDS.sleep(5); e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch(OverlappingFileLockException e){ TimeUnit.SECONDS.sleep(5); lock.release(); e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { w.close(); ini.close(); } } } public static void main(String [] args){ read(); write(); } -- Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/
how to use threads in nifi?
I have one config file inside my nifi environment, in which i should change, check and update values , but i shouldmake this operation in independent threads so that(threads should't flush) here are several things i am interested in: 1. Can i use Synchronized code blocks inside executeScript processor? 2.How can i use Thread locks inside nifi? here is my code but it doesn't work properly what should i change to make my logic work? class Main2 extends Thread{ static String content = ""; static File file = new File("C:/Users/Desktop/test/conf.xml"); static BufferedReader s; static BufferedWriter w; static RandomAccessFile ini= new RandomAccessFile(file, "rwd"); static FileLock lock= ini.getChannel().lock(); synchronized void read() { try { String sCurrentLine; s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8")); while ((sCurrentLine = s.readLine()) != null) { content += sCurrentLine; } ini.seek(0); /* def flowFile1 = session.create() flowFile1 = session.putAttribute(flowFile1, "filename", "conf.xml"); session.write(flowFile1, new StreamCallback() { @Override public void process(InputStream inputStream1, OutputStream outputStream) throws IOException { outputStream.write(content.getBytes(StandardCharsets.UTF_8)) } }); session.transfer(flowFile1, REL_SUCCESS);*/ def xml = new XmlParser().parseText(content); xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody 'false'}; def newxml1 = XmlUtil.serialize(xml); String data = newxml1; if (!data.isEmpty()) { ini.setLength(0); w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8")); w.write(data); lock.release(); w.close(); } println data; }catch (FileNotFoundException e) { TimeUnit.SECONDS.sleep(5); e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch(OverlappingFileLockException e){ TimeUnit.SECONDS.sleep(5); lock.release(); e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { w.close(); ini.close(); } } synchronized void write() {try { String sCurrentLine; s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8")); while ((sCurrentLine = s.readLine()) != null) { content += sCurrentLine; } // println content; ini.seek(0); /* def flowFile = session.get(); if (flowFile != null) return; def serviceName = flowFile.getAttribute('serviceName'); def date = flowFile.getAttribute('filename').substring(0, 10); def xml = new XmlParser().parseText(content) if (serviceName == 'borderCrossDecl') { xml.RS.borderCrossDecl.details.findAll({ p -> p.runAs[0].text() == "false" && p.start[0].text() == date.toString(); }).each({ p -> p.start[0].value = addDays(p.start[0].text()) p.runAs[0].value = "true" }) }*/ def newXml = groovy.xml.XmlUtil.serialize(content) String data = newXml.toString(); if (!data.isEmpty()) { ini.setLength(0); w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8")); w.write(data); lock.release(); w.close(); } }catch (FileNotFoundException e) { TimeUnit.SECONDS.sleep(5); e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch(OverlappingFileLockException e){ TimeUnit.SECONDS.sleep(5); lock.release(); e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { w.close(); ini.close(); } } public static void main(String [] args){ Main2 r = new Main2(); Thread one = new Thread(r); one.start(); } } -- Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/