I have one config file inside my nifi environment, in which i should change,
check and update values , but i should make this operation in independent
threads so that(threads should't flush)

here are several things i am interested in: 
1. Can i use Synchronized code blocks inside executeScript processor? 
2.How can i use Thread locks inside nifi? 

here is my code but it doesn't work properly what should i change to make my
logic work?

 static String content = "";
static File file = new File("C:/Users/Desktop/test/conf.xml");
static BufferedReader s;
static BufferedWriter w;
static RandomAccessFile ini= new RandomAccessFile(file, "rwd");
static FileLock lock= ini.getChannel().lock();
public static synchronized void read() {
    try {
        String sCurrentLine;
        s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
        while ((sCurrentLine = s.readLine()) != null) {
            content += sCurrentLine;
        }
 
        ini.seek(0);
        /* def flowFile1 = session.create()
               flowFile1 = session.putAttribute(flowFile1, "filename",
"conf.xml");
               session.write(flowFile1, new StreamCallback() {
                   @Override
                   public void process(InputStream inputStream1,
OutputStream outputStream) throws IOException {
 
                      
outputStream.write(content.getBytes(StandardCharsets.UTF_8))
                   }
 
               });
               session.transfer(flowFile1, REL_SUCCESS);*/
 
        def xml = new XmlParser().parseText(content);
        //xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody
'false'};
        def newxml1 = XmlUtil.serialize(xml);
        String data = newxml1;
        if (!data.isEmpty()) {
            ini.setLength(0);
            w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
            w.write(data);
            lock.release();
            w.close();
 
        }
        println data;
 
    }catch (FileNotFoundException e) {
        TimeUnit.SECONDS.sleep(50000);
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
 
    } catch(OverlappingFileLockException e){
        TimeUnit.SECONDS.sleep(50000);
        lock.release();
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        w.close();
        ini.close();
    }
}
 
public static void write() {
    synchronized (Main1.class) {
        try {
 
            String sCurrentLine;
            s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
            while ((sCurrentLine = s.readLine()) != null) {
                content += sCurrentLine;
            }
            // println content;
            ini.seek(0);
 
            /* def flowFile = session.get();
             if (flowFile != null) return;
             def serviceName = flowFile.getAttribute('serviceName');
             def date = flowFile.getAttribute('filename').substring(0, 10);
             def xml = new XmlParser().parseText(content)
             if (serviceName == 'borderCrossDecl') {
 
                 xml.RS.borderCrossDecl.details.findAll({ p ->
                     p.runAs[0].text() == "false" && p.start[0].text() ==
date.toString();
                 }).each({ p ->
                     p.start[0].value = addDays(p.start[0].text())
                     p.runAs[0].value = "true"
                 })
             }*/
 
            def newXml = groovy.xml.XmlUtil.serialize(content)
 
            String data = newXml.toString();
            if (!data.isEmpty()) {
                ini.setLength(0);
                w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
                w.write(data);
                lock.release();
                w.close();
 
            }
        }catch (FileNotFoundException e) {
            TimeUnit.SECONDS.sleep(50000);
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
 
        } catch(OverlappingFileLockException e){
            TimeUnit.SECONDS.sleep(50000);
            lock.release();
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            w.close();
            ini.close();
        }
    }
}
public static void main(String  [] args){
    read();
    write();
}



--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/

Reply via email to