Re: Do CRON driven and Yield duration work together?

2017-12-18 Thread Mark Payne
Jim,

Yes, they should work together in concert.

Thanks
-Mark


> On Dec 18, 2017, at 12:16 PM, James McMahon  wrote:
> 
> Hello. I would like to set up a processor that initiates my workflow as a 
> CRON driven processor. Furthermore, I must ensure that this processor does 
> not run for at least an hour from its previous run. 
> 
> I would set my processor CRON driven specification like this:
> 0 15 * * * ? *
> and I think I should set my Yield duration like this
> 3601 seconds
> 
> Can I use CRON driven in concert with Yield duration like I described to meet 
> this requirement? Thanks very much in advance for your help.



Do CRON driven and Yield duration work together?

2017-12-18 Thread James McMahon
Hello. I would like to set up a processor that initiates my workflow as a
CRON driven processor. Furthermore, I must ensure that this processor does
not run for at least an hour from its previous run.

I would set my processor CRON driven specification like this:
0 15 * * * ? *
and I think I should set my Yield duration like this
3601 seconds

Can I use CRON driven in concert with Yield duration like I described to
meet this requirement? Thanks very much in advance for your help.


Re: Prioritizing flowFiles to tailor throughput

2017-12-18 Thread Joe Percivall
Hey James,

Sorry, no one responded when you first sent the message but I'm curious
what you ended up doing and any findings you had. Also, wanted to bring
this thread back up to the attention of the larger group as it brings up
some interesting questions I haven't found discussed elsewhere.

On the topic of the re-sorting of the queue, I was curious about the
answer, so I dug down to the StandardFlowFileQueue and found that it's
primarily just wrapping an instance of Java's PriorityQueue for its active
queue[1]. This means that sorting is done each time a FlowFile is enqueued
but also that we have immediate access to the head of the queue. I'm sure
someone else (Mark Payne?) could explain better how we make use of the
nuances of the queue for better performance and the impacts the different
queue prioritizers have.

For the higher priority FlowFiles starving out lower priority ones, I'm
thinking about a way to give a weight instead of a priority. So in essence,
a "weighted funnel processor", which grabs X Flowfiles each time but has a
weighting assigned to different categories such that you take a certain
number of each category based on a given weight. That said, I'm not sure
that would be guaranteed to work when FlowFiles in the queue are swapped
out since even if we iterated over everything in the incoming connection,
there are still others swapped to disk. Also, there's probably performance
concerns if we tried to implement it using the current tools offered to a
processor.

For the separate NiFis approach, I'm curious what other's view is.
Personally, it makes sense to me, that for flows that are dramatically
different in priority you'd want to section it off to another instance of
NiFi. Essentially the separation between data-plane and control-plane
instances of NiFi.


Lastly, James, I assume you're limited to using the 0.7.x release for a
specific reason? I'd highly suggest upgrading to the latest version
whenever possible. There are many security and performance improvements,
and of course many new features.

[1]
https://github.com/apache/nifi/blob/7f4cfd51ea07ead6c9b71b6c6d6f87a352b801d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java#L89

Joe

On Thu, Oct 19, 2017 at 8:58 AM, James McMahon  wrote:

> Our team is considering ways to accelerate delivery for certain subsets of
> content we process through NiFi. We are using Apache NiFi 0.7.x as our
> baseline.
>
> This link discusses a recommended approach to content prioritization using
> PriorityAttributePrioritizer on a connector (queue) to tailor throughput
> based on a priority attribute we set upstream in our flow:
>
> https://stackoverflow.com/questions/42528993/how-to-
> specify-priority-attributes-for-individual-flowfiles
>
> How often does the connector queue have to re-sort contents in order to
> enforce our priority attribute? Is it re-sorting *every *single time new
> flowFiles hit the queue? Won't that markedly and negatively impact
> performance?
>
> If our priority 1s are a huge volume of flowfiles that persists over time,
> won't this approach cause our priority 2s, 3s, etc etc to languish in queue?
>
> The described approach seems to embed significant business logic in the
> NiFi workflows. In an environment where priorities change often, would that
> be considered a poor approach? Might it be better to enforce priority
> processing at a higher architectural level - a lightweight NiFi server to
> accelerate delivery of priority one content and email alerts, a priority
> two suite of NiFi servers for standard flowfile volume, a priority three
> suite of servers to handle long-term bulk processing, etc etc?
>
> Thanks in advance for your help.  -Jim
>



-- 
*Joe Percivall*
linkedin.com/in/Percivall
e: jperciv...@apache.com


how to use threads in nifi?

2017-12-18 Thread sally
I have one config file inside my nifi environment, in which i should change,
check and update values , but i should make this operation in independent
threads so that(threads should't flush)

here are several things i am interested in: 
1. Can i use Synchronized code blocks inside executeScript processor? 
2.How can i use Thread locks inside nifi? 

here is my code but it doesn't work properly what should i change to make my
logic work?

 static String content = "";
static File file = new File("C:/Users/Desktop/test/conf.xml");
static BufferedReader s;
static BufferedWriter w;
static RandomAccessFile ini= new RandomAccessFile(file, "rwd");
static FileLock lock= ini.getChannel().lock();
public static synchronized void read() {
try {
String sCurrentLine;
s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
while ((sCurrentLine = s.readLine()) != null) {
content += sCurrentLine;
}
 
ini.seek(0);
/* def flowFile1 = session.create()
   flowFile1 = session.putAttribute(flowFile1, "filename",
"conf.xml");
   session.write(flowFile1, new StreamCallback() {
   @Override
   public void process(InputStream inputStream1,
OutputStream outputStream) throws IOException {
 
  
outputStream.write(content.getBytes(StandardCharsets.UTF_8))
   }
 
   });
   session.transfer(flowFile1, REL_SUCCESS);*/
 
def xml = new XmlParser().parseText(content);
//xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody
'false'};
def newxml1 = XmlUtil.serialize(xml);
String data = newxml1;
if (!data.isEmpty()) {
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
w.write(data);
lock.release();
w.close();
 
}
println data;
 
}catch (FileNotFoundException e) {
TimeUnit.SECONDS.sleep(5);
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
 
} catch(OverlappingFileLockException e){
TimeUnit.SECONDS.sleep(5);
lock.release();
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
w.close();
ini.close();
}
}
 
public static void write() {
synchronized (Main1.class) {
try {
 
String sCurrentLine;
s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
while ((sCurrentLine = s.readLine()) != null) {
content += sCurrentLine;
}
// println content;
ini.seek(0);
 
/* def flowFile = session.get();
 if (flowFile != null) return;
 def serviceName = flowFile.getAttribute('serviceName');
 def date = flowFile.getAttribute('filename').substring(0, 10);
 def xml = new XmlParser().parseText(content)
 if (serviceName == 'borderCrossDecl') {
 
 xml.RS.borderCrossDecl.details.findAll({ p ->
 p.runAs[0].text() == "false" && p.start[0].text() ==
date.toString();
 }).each({ p ->
 p.start[0].value = addDays(p.start[0].text())
 p.runAs[0].value = "true"
 })
 }*/
 
def newXml = groovy.xml.XmlUtil.serialize(content)
 
String data = newXml.toString();
if (!data.isEmpty()) {
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
w.write(data);
lock.release();
w.close();
 
}
}catch (FileNotFoundException e) {
TimeUnit.SECONDS.sleep(5);
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
 
} catch(OverlappingFileLockException e){
TimeUnit.SECONDS.sleep(5);
lock.release();
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
w.close();
ini.close();
}
}
}
public static void main(String  [] args){
read();
write();
}



--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/


how to use threads in nifi?

2017-12-18 Thread sally
I have  one  config   file  inside  my  nifi  environment,   in   which  i 
should change, check and  update  values ,  but  i  shouldmake  this  
operation in  independent threads   so  that(threads should't   flush)
here  are several  things  i am  interested  in:
1. Can  i  use  Synchronized  code  blocks  inside  executeScript 
processor?
2.How  can i  use   Thread  locks  inside nifi?

here is  my  code   but it  doesn't   work  properly  what  should i  change 
to make  my  logic   work?
class Main2 extends Thread{
static String content = "";
static File file = new File("C:/Users/Desktop/test/conf.xml");
static BufferedReader s;
static BufferedWriter w;
static RandomAccessFile ini= new RandomAccessFile(file, "rwd");
static FileLock lock= ini.getChannel().lock();
synchronized void read()
{
try {
String sCurrentLine;
s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
while ((sCurrentLine = s.readLine()) != null) {
content += sCurrentLine;
}

ini.seek(0);
/* def flowFile1 = session.create()
   flowFile1 = session.putAttribute(flowFile1, "filename",
"conf.xml");
   session.write(flowFile1, new StreamCallback() {
   @Override
   public void process(InputStream inputStream1,
OutputStream outputStream) throws IOException {

  
outputStream.write(content.getBytes(StandardCharsets.UTF_8))
   }

   });
   session.transfer(flowFile1, REL_SUCCESS);*/

def xml = new XmlParser().parseText(content);
xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody
'false'};
def newxml1 = XmlUtil.serialize(xml);
String data = newxml1;
if (!data.isEmpty()) {
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
w.write(data);
lock.release();
w.close();

}
println data;

}catch (FileNotFoundException e) {
TimeUnit.SECONDS.sleep(5);
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();

} catch(OverlappingFileLockException e){
TimeUnit.SECONDS.sleep(5);
lock.release();
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
w.close();
ini.close();
}

}

synchronized void write()
{try {

String sCurrentLine;
s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
while ((sCurrentLine = s.readLine()) != null) {
content += sCurrentLine;
}
// println content;
ini.seek(0);

/* def flowFile = session.get();
 if (flowFile != null) return;
 def serviceName = flowFile.getAttribute('serviceName');
 def date = flowFile.getAttribute('filename').substring(0, 10);
 def xml = new XmlParser().parseText(content)
 if (serviceName == 'borderCrossDecl') {

 xml.RS.borderCrossDecl.details.findAll({ p ->
 p.runAs[0].text() == "false" && p.start[0].text() ==
date.toString();
 }).each({ p ->
 p.start[0].value = addDays(p.start[0].text())
 p.runAs[0].value = "true"
 })
 }*/

def newXml = groovy.xml.XmlUtil.serialize(content)

String data = newXml.toString();
if (!data.isEmpty()) {
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
w.write(data);
lock.release();
w.close();

}
}catch (FileNotFoundException e) {
TimeUnit.SECONDS.sleep(5);
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();

} catch(OverlappingFileLockException e){
TimeUnit.SECONDS.sleep(5);
lock.release();
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
w.close();
ini.close();
}


}

public static  void main(String [] args){
Main2 r = new Main2();
Thread one = new Thread(r);
one.start();

}
}




--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/