Hello,
I implemented a custom property function, which is retrieving data from
a relational database during the a SPARQL- query. The idea is to keep
the data in the database and load it into the graph only if it’s necessary.
It worked with Jena, but when I apply it to Fuseki I am getting problems
because of the lock mechanism for concurrent access.
The custom property function, which is used in a query, adds elements to
the graph (“write”), which is not allowed during the read-lock
transaction of the query (see error message below).
Is it possible to configure Fuseki in a way, that the query endpoint
would always use a write-lock, like the update endpoint does? As it is
not a public endpoint, the performance issue won’t be a problem.
My second approach would be to write the data from the relational
database into a second memory-stored graph. But I have no idea how I
will be able to make the connection between the two graphs for the
custom property function?
Do you have any hints for me on how I can reach my goal of adding data
just on demand?
Here is the code and the error, to get an idea of what I am trying to do:
public class GetDBValues implements PropertyFunctionFactory {
public static void init() {
//register custom property function
final PropertyFunctionRegistry reg =
PropertyFunctionRegistry.chooseRegistry(ARQ.getContext());
reg.put("http://www.semanticweb.org/ontologies/2018/8/SensorTest.owl#getdbValues",
new GetDBValues());
PropertyFunctionRegistry.set(ARQ.getContext(), reg);
}
@Override
public PropertyFunction create(final String uri)
{
return new PFuncSimple()
{
@Override
public QueryIterator execEvaluated(Binding parent, Node
subject, Node predicate, Node object,ExecutionContext execCxt) {
//get datapoint URI
String datapointURI=subject.getURI().toString();
//create the model from the context
Graph graph=execCxt.getActiveGraph();
Model model =ModelFactory.createModelForGraph(graph);
//Here the data from the raltional database is
retrieved
DatabaseConnector con= new InfluxDBMockup();
//produce dummy data
series = con.getData("dbName", "table","col")
if (series != null){
Model model2 = ModelFactory.createDefaultModel();
// create graph for database entries
String
ns="http://www.semanticweb.org/ontologies/2018/8/SensorTest.owl#";
Property valueProperty=
ResourceFactory.createProperty(ns +"currentValue");
Property timestampProperty=
ResourceFactory.createProperty(ns +"timestamp");
for (TimeValuePair obs : series) {
//create blank observation node
Node bNode =
NodeFactory.createBlankNode();
RDFNode rdfNode=model.asRDFNode(bNode);
//add timestamp and sensor value to the graph
//This model.add () causes the error!!!
model.add(rdfNode.asResource(),
valueProperty, obs.getValue().toString());
model.add(rdfNode.asResource(),
timestampProperty, obs.getTimeStamp());
final Binding b =
BindingFactory.binding(parent, Var.alloc(object),bNode);
bindings.add(b);
}
}
return new
QueryIterPlainWrapper(bindings.iterator(), execCxt) ;
}
};
}
}
The query looks like:
SELECT *
WHERE{ ?dataPoint se:hasType ?type.
?dataPoint se:getdbValues ?observedValue
?observedValue se:timestamp ?time
}
Here the error:
Fuseki WARN [7] RC = 500 : Tried to write inside a READ transaction!
org.apache.jena.sparql.JenaTransactionException: Tried to write inside a
READ transaction!
at
org.apache.jena.sparql.core.mem.DatasetGraphInMemory.mutate(DatasetGraphInMemory.java:382)
at
org.apache.jena.sparql.core.mem.DatasetGraphInMemory.addToDftGraph(DatasetGraphInMemory.java:416)
at
org.apache.jena.sparql.core.DatasetGraphTriplesQuads.add(DatasetGraphTriplesQuads.java:42)
at
org.apache.jena.sparql.core.GraphView.performAdd(GraphView.java:152)
at org.apache.jena.graph.impl.GraphBase.add(GraphBase.java:181)
at org.apache.jena.rdf.model.impl.ModelCom.add(ModelCom.java:1202)
at org.apache.jena.rdf.model.impl.ModelCom.add(ModelCom.java:184)
at org.apache.jena.rdf.model.impl.ModelCom.add(ModelCom.java:172)
at at.bim4bems.GetDBValues$1.execEvaluated(GetDBValues.java:125)
at
org.apache.jena.sparql.pfunction.PFuncSimple.execEvaluated(PFuncSimple.java:45)
at
org.apache.jena.sparql.pfunction.PropertyFunctionEval.exec(PropertyFunctionEval.java:42)
at
org.apache.jena.sparql.pfunction.PropertyFunctionBase$RepeatApplyIteratorPF.nextStage(PropertyFunctionBase.java:106)
at
org.apache.jena.sparql.engine.iterator.QueryIterRepeatApply.makeNextStage(QueryIterRepeatApply.java:108)
at
org.apache.jena.sparql.engine.iterator.QueryIterRepeatApply.hasNextBinding(QueryIterRepeatApply.java:65)
at
org.apache.jena.sparql.engine.iterator.QueryIteratorBase.hasNext(QueryIteratorBase.java:114)
at
org.apache.jena.sparql.engine.iterator.QueryIterProcedure.hasNextBinding(QueryIterProcedure.java:73)
at
org.apache.jena.sparql.engine.iterator.QueryIteratorBase.hasNext(QueryIteratorBase.java:114)
at
org.apache.jena.sparql.engine.main.StageGeneratorGeneric.execute(StageGeneratorGeneric.java:61)
at
org.apache.jena.sparql.engine.main.StageGeneratorGeneric.execute(StageGeneratorGeneric.java:53)
at
org.apache.jena.tdb.solver.StageGeneratorDirectTDB.execute(StageGeneratorDirectTDB.java:53)
at
org.apache.jena.tdb2.solver.StageGeneratorDirectTDB.execute(StageGeneratorDirectTDB.java:59)
at
org.apache.jena.sparql.engine.main.OpExecutor.execute(OpExecutor.java:128)
at
org.apache.jena.sparql.engine.main.ExecutionDispatch.visit(ExecutionDispatch.java:58)
at org.apache.jena.sparql.algebra.op.OpBGP.visit(OpBGP.java:49)
at
org.apache.jena.sparql.engine.main.ExecutionDispatch.exec(ExecutionDispatch.java:46)
at
org.apache.jena.sparql.engine.main.OpExecutor.exec(OpExecutor.java:117)
at
org.apache.jena.sparql.engine.main.OpExecutor.execute(OpExecutor.java:228)
at
org.apache.jena.sparql.engine.main.ExecutionDispatch.visit(ExecutionDispatch.java:130)
at
org.apache.jena.sparql.algebra.op.OpSequence.visit(OpSequence.java:75)
at
org.apache.jena.sparql.engine.main.ExecutionDispatch.exec(ExecutionDispatch.java:46)
at
org.apache.jena.sparql.engine.main.OpExecutor.exec(OpExecutor.java:117)
at
org.apache.jena.sparql.engine.main.OpExecutor.execute(OpExecutor.java:88)
at org.apache.jena.sparql.engine.main.QC.execute(QC.java:52)
at
org.apache.jena.sparql.engine.main.QueryEngineMain.eval(QueryEngineMain.java:55)
at
org.apache.jena.sparql.engine.QueryEngineBase.evaluate(QueryEngineBase.java:175)
at
org.apache.jena.sparql.engine.QueryEngineBase.createPlan(QueryEngineBase.java:131)
at
org.apache.jena.sparql.engine.QueryEngineBase.getPlan(QueryEngineBase.java:112)
at
org.apache.jena.sparql.engine.main.QueryEngineMain$QueryEngineMainFactory.create(QueryEngineMain.java:90)
at
org.apache.jena.sparql.engine.QueryExecutionBase.getPlan(QueryExecutionBase.java:593)
at
org.apache.jena.sparql.engine.QueryExecutionBase.startQueryIterator(QueryExecutionBase.java:542)
at
org.apache.jena.sparql.engine.QueryExecutionBase.execResultSet(QueryExecutionBase.java:581)
at
org.apache.jena.sparql.engine.QueryExecutionBase.execSelect(QueryExecutionBase.java:204)
at
org.apache.jena.fuseki.servlets.SPARQL_Query.executeQuery(SPARQL_Query.java:344)
at
org.apache.jena.fuseki.servlets.SPARQL_Query.execute(SPARQL_Query.java:288)
at
org.apache.jena.fuseki.servlets.SPARQL_Query.executeWithParameter(SPARQL_Query.java:242)
at
org.apache.jena.fuseki.servlets.SPARQL_Query.perform(SPARQL_Query.java:227)
at
org.apache.jena.fuseki.servlets.ActionService.executeLifecycle(ActionService.java:183)
at
org.apache.jena.fuseki.servlets.ActionService.execCommonWorker(ActionService.java:98)
at
org.apache.jena.fuseki.servlets.ActionBase.doCommon(ActionBase.java:74)
at
org.apache.jena.fuseki.servlets.FusekiFilter.doFilter(FusekiFilter.java:73)
at
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
at
org.apache.shiro.web.servlet.ProxiedFilterChain.doFilter(ProxiedFilterChain.java:61)
at
org.apache.shiro.web.servlet.AdviceFilter.executeChain(AdviceFilter.java:108)
at
org.apache.shiro.web.servlet.AdviceFilter.doFilterInternal(AdviceFilter.java:137)
at
org.apache.shiro.web.servlet.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:125)
at
org.apache.shiro.web.servlet.ProxiedFilterChain.doFilter(ProxiedFilterChain.java:66)
at
org.apache.shiro.web.servlet.AbstractShiroFilter.executeChain(AbstractShiroFilter.java:449)
at
org.apache.shiro.web.servlet.AbstractShiroFilter$1.call(AbstractShiroFilter.java:365)
at
org.apache.shiro.subject.support.SubjectCallable.doCall(SubjectCallable.java:90)
at
org.apache.shiro.subject.support.SubjectCallable.call(SubjectCallable.java:83)
at
org.apache.shiro.subject.support.DelegatingSubject.execute(DelegatingSubject.java:383)
at
org.apache.shiro.web.servlet.AbstractShiroFilter.doFilterInternal(AbstractShiroFilter.java:362)
at
org.apache.shiro.web.servlet.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:125)
at
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
at
org.apache.jena.fuseki.servlets.CrossOriginFilter.handle(CrossOriginFilter.java:285)
at
org.apache.jena.fuseki.servlets.CrossOriginFilter.doFilter(CrossOriginFilter.java:248)
at
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634)
at
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533)
at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146)
at
org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257)
at
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)
at
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1340)
at
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
at
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
at
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)
at
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1242)
at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
at
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:690)
at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:503)
at
org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:364)
at
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
at
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
at
org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at
org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
at
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
at
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
at
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
at
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
at
org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765)
at
org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683)
at java.lang.Thread.run(Unknown Source)
[2018-11-20 16:00:00] Fuseki INFO [7] 500 Tried to write inside a
READ transaction! (70 ms)