I have one config file inside my nifi environment, in which i should change,
check and update values , but i should make this operation in independent
threads so that(threads should't flush)
here are several things i am interested in:
1. Can i use Synchronized code blocks inside executeScript processor?
2.How can i use Thread locks inside nifi?
here is my code but it doesn't work properly what should i change to make my
logic work?
static String content = "";
static File file = new File("C:/Users/Desktop/test/conf.xml");
static BufferedReader s;
static BufferedWriter w;
static RandomAccessFile ini= new RandomAccessFile(file, "rwd");
static FileLock lock= ini.getChannel().lock();
public static synchronized void read() {
try {
String sCurrentLine;
s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
while ((sCurrentLine = s.readLine()) != null) {
content += sCurrentLine;
}
ini.seek(0);
/* def flowFile1 = session.create()
flowFile1 = session.putAttribute(flowFile1, "filename",
"conf.xml");
session.write(flowFile1, new StreamCallback() {
@Override
public void process(InputStream inputStream1,
OutputStream outputStream) throws IOException {
outputStream.write(content.getBytes(StandardCharsets.UTF_8))
}
});
session.transfer(flowFile1, REL_SUCCESS);*/
def xml = new XmlParser().parseText(content);
//xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody
'false'};
def newxml1 = XmlUtil.serialize(xml);
String data = newxml1;
if (!data.isEmpty()) {
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
w.write(data);
lock.release();
w.close();
}
println data;
}catch (FileNotFoundException e) {
TimeUnit.SECONDS.sleep(50000);
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch(OverlappingFileLockException e){
TimeUnit.SECONDS.sleep(50000);
lock.release();
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
w.close();
ini.close();
}
}
public static void write() {
synchronized (Main1.class) {
try {
String sCurrentLine;
s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
while ((sCurrentLine = s.readLine()) != null) {
content += sCurrentLine;
}
// println content;
ini.seek(0);
/* def flowFile = session.get();
if (flowFile != null) return;
def serviceName = flowFile.getAttribute('serviceName');
def date = flowFile.getAttribute('filename').substring(0, 10);
def xml = new XmlParser().parseText(content)
if (serviceName == 'borderCrossDecl') {
xml.RS.borderCrossDecl.details.findAll({ p ->
p.runAs[0].text() == "false" && p.start[0].text() ==
date.toString();
}).each({ p ->
p.start[0].value = addDays(p.start[0].text())
p.runAs[0].value = "true"
})
}*/
def newXml = groovy.xml.XmlUtil.serialize(content)
String data = newXml.toString();
if (!data.isEmpty()) {
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
w.write(data);
lock.release();
w.close();
}
}catch (FileNotFoundException e) {
TimeUnit.SECONDS.sleep(50000);
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch(OverlappingFileLockException e){
TimeUnit.SECONDS.sleep(50000);
lock.release();
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
w.close();
ini.close();
}
}
}
public static void main(String [] args){
read();
write();
}
--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/