Hi there,

This is Abika from Marklogic. I work on a project called MLCP (Marklogic 
contentpump) which used to bulk load and transfer data to and from Marklogic 
server.

Our MLCP project uses Jena framework to process RDFs. We have built this 
project with Jena 2.13 previously and now we are transitioning to Jena 4.8.0 
(latest available). Given the time frame, there are many changes in parsing 
with Jena. Right now we are using asyncparser (following a suggestion from jena 
javadocs). I notice there is some lag when using asyncparser and its also 
mentioned in the 
Javadoc<https://javadoc.io/doc/org.apache.jena/jena-arq/latest/org.apache.jena.arq/org/apache/jena/riot/system/AsyncParser.html>.

In our codebase, To parse a bunch of files in the zip/archive, we create a 
runnable parser for each file and submit them to executor service. In this 
implementation, I see some of the files being skipped due to the latency in 
result deliverance from asyncparser API calls. For now we are considering to 
either implement a wait on the results from asyncparser (OR) stop creating 
parallel threads to process many files in a zip/archive.

I would like to get your suggestions on what are some robust ways to handle 
this latency?

Heres the code of the run() in runnable parser class. I see the latency in 
debugging when the API calls to riotparser and asyncparser doesn’t return data 
right after the call.


public void run() {

    ErrorHandler handler = new ParserErrorHandler(fsname);
    ParserProfile prof = RiotLib.profile(lang, fsname, handler);
    try {
        if (lang == Lang.TRIG) {
            rdfInputStream = AsyncParser.of(in, lang, origFn).streamQuads();
            rdfIter = rdfInputStream.iterator();
        } else if (lang == Lang.NTRIPLES) {
            rdfIter = RiotParsers.createIteratorNTriples(in,prof);
            System.out.println("2else ntriples async run ");
        } else if (lang == Lang.NQUADS) {
            rdfIter = RiotParsers.createIteratorNQuads(in,prof);
        }else {
            rdfInputStream = AsyncParser.of(in, lang, 
fsname).setChunkSize(10).streamTriples();
            rdfIter = rdfInputStream.iterator();

        }
    } catch (Exception ex) {
        failed = true;
        LOG.error("Parse error in RDF document(please check intactness and 
encoding); processing partial document:" + origFn + " " + ex.getMessage());
        ex.printStackTrace();
    }
}



pool = Executors.newFixedThreadPool(1);

RunnableParser jenaStreamingParser = new RunnableParser(origFn, fsname, in, 
lang);

pool.submit(jenaStreamingParser);



Regards,
Abika

This message and any attached documents contain information of MarkLogic and/or 
its customers that may be confidential and/or privileged. If you are not the 
intended recipient, you may not read, copy, distribute, or use this 
information. If you have received this transmission in error, please notify the 
sender immediately by reply e-mail and then delete this message. This email may 
contain pricing or other suggested contract terms related to MarkLogic software 
or services. Any such terms are not binding on MarkLogic unless and until they 
are included in a definitive agreement executed by MarkLogic.

Reply via email to