Aklakan commented on code in PR #3184: URL: https://github.com/apache/jena/pull/3184#discussion_r2313440577
########## jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/DatasetGraphOverSparql.java: ########## @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.sparql.engine.dispatch; + +import static org.apache.jena.query.ReadWrite.WRITE; + +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +import org.apache.jena.atlas.iterator.Iter; +import org.apache.jena.atlas.iterator.IteratorCloseable; +import org.apache.jena.graph.Graph; +import org.apache.jena.graph.Node; +import org.apache.jena.graph.Triple; +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryFactory; +import org.apache.jena.query.ReadWrite; +import org.apache.jena.query.TxnType; +import org.apache.jena.riot.system.PrefixMap; +import org.apache.jena.riot.system.PrefixMapFactory; +import org.apache.jena.riot.system.Prefixes; +import org.apache.jena.riot.system.StreamRDF; +import org.apache.jena.sparql.JenaTransactionException; +import org.apache.jena.sparql.core.BasicPattern; +import org.apache.jena.sparql.core.DatasetGraphBase; +import org.apache.jena.sparql.core.GraphView; +import org.apache.jena.sparql.core.Quad; +import org.apache.jena.sparql.core.Substitute; +import org.apache.jena.sparql.core.Transactional; +import org.apache.jena.sparql.core.TransactionalNull; +import org.apache.jena.sparql.core.Var; +import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.engine.binding.BindingFactory; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.expr.Expr; +import org.apache.jena.sparql.expr.aggregate.AggCount; +import org.apache.jena.sparql.modify.request.QuadAcc; +import org.apache.jena.sparql.modify.request.QuadDataAcc; +import org.apache.jena.sparql.modify.request.UpdateDataDelete; +import org.apache.jena.sparql.modify.request.UpdateDataInsert; +import org.apache.jena.sparql.modify.request.UpdateDeleteWhere; +import org.apache.jena.sparql.modify.request.UpdateDrop; +import org.apache.jena.sparql.syntax.Element; +import org.apache.jena.sparql.syntax.ElementNamedGraph; +import org.apache.jena.sparql.syntax.ElementTriplesBlock; +import org.apache.jena.sparql.syntax.ElementUnion; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.update.Update; +import org.apache.jena.update.UpdateRequest; + +/** + * This class provides a base implementation of the Jena DatasetGraph interface + * to a remote SPARQL endpoint. Efficiency not guaranteed. + * + * Any returned iterators must be closed to free the resources. + * This base class does not support transactions. + * + * All inserts are passed on as SPARQL update requests. + * Blank nodes should be avoided because they are likely to become renamed across separate requests. + * + * Invocation of deleteAny() across the default graph and all named graphs fires two requests. + * All other methods fire a single request. + */ +public abstract class DatasetGraphOverSparql + extends DatasetGraphBase +{ + private PrefixMap prefixes = PrefixMapFactory.create(); + private Transactional transactional = TransactionalNull.create(); + + public DatasetGraphOverSparql() { + super(); + initContext(); + } + + protected PrefixMap getPrefixes() { + return prefixes; + } + + protected Transactional getTransactional() { + return transactional; + } + + protected void initContext() { + Context cxt = getContext(); + // Use the context to advertise that SPARQL statements should not be parsed. + SparqlDispatcherRegistry.setParseCheck(cxt, false); + } + + protected abstract QueryExec query(Query query); + protected abstract UpdateExec update(UpdateRequest UpdateRequest); + + protected void execUpdate(Update update) { + execUpdate(new UpdateRequest(update)); + } + + protected void execUpdate(UpdateRequest updateRequest) { + UpdateExec uExec = update(updateRequest); + uExec.execute(); + } + + /** + * This method must return a StreamRDF instance that handles bulk inserts of RDF tuples (triples or quads). + * The default implementation flushes every 1000 tuples. + * Alternative implementations could e.g. flush by the string length of the update request. + */ + protected StreamRDF newUpdateSink() { + StreamRDF sink = new StreamRDFToUpdateRequest(this::execUpdate, Prefixes.adapt(getPrefixes()), 1000); + return sink; + } + + @Override + public Iterator<Node> listGraphNodes() { + QueryExec qExec = query(graphsQuery); + return Iter.onClose( + Iter.map(qExec.select(), b -> b.get(vg)), + qExec::close); + } + + @Override + public Iterator<Quad> find(Node g, Node s, Node p, Node o) { + Iterator<Quad> result; + if (g == null || Node.ANY.equals(g)) { + result = findTriplesOrQuads(this::query, s, p, o); + } else if (Quad.isDefaultGraph(g)) { + Iterator<Triple> base = findTriples(this::query, s, p, o); + result = Iter.map(base, t -> Quad.create(Quad.defaultGraphIRI, t)); + } else { + result = findQuads(this::query, g, s, p, o); + } + return result; + } + + @Override + public Iterator<Quad> findNG(Node g, Node s, Node p, Node o) { + Iterator<Quad> result = findQuads(this::query, g, s, p, o); + return result; + } + + @Override + public Graph getDefaultGraph() { + DatasetGraphOverSparql self = this; + return new GraphView(this, Quad.defaultGraphNodeGenerated) { + @Override + protected int graphBaseSize() { + long size = sizeLong(); + return (size < Integer.MAX_VALUE) ? (int)size : Integer.MAX_VALUE; + } + + @Override + public long sizeLong() { + long result = fetchLong(self::query, defaultGraphSizeQuery, vc); + return result; + } + }; + } + + @Override + public Graph getGraph(Node graphNode) { + DatasetGraphOverSparql self = this; + return new GraphView(this, graphNode) { + @Override + protected int graphBaseSize() { + long size = sizeLong(); + return (size < Integer.MAX_VALUE) ? (int)size : Integer.MAX_VALUE; + } + + @Override + public long sizeLong() { + Query q = createQueryNamedGraphSize(graphNode, vc); + long result = fetchLong(self::query, q, vc); + return result; + } + }; + } + + @Override + public void addGraph(Node graphName, Graph graph) { + StreamRDF sink = newUpdateSink(); + try { + sink.start(); + StreamRDFToUpdateRequest.sendGraphTriplesToStream(graph, graphName, sink); + } finally { + sink.finish(); + } + } + + @Override + public void removeGraph(Node graphName) { + Objects.requireNonNull(graphName); + UpdateRequest ur = new UpdateRequest(new UpdateDrop(graphName)); + execUpdate(ur); + } + + @Override + public void add(Quad quad) { + Quad q = harmonizeTripleInQuad(quad); + if (!q.isConcrete()) { + throw new IllegalArgumentException("Concrete quad expected."); + } + Update update = new UpdateDataInsert(new QuadDataAcc(List.of(q))); + execUpdate(new UpdateRequest(update)); + } + + @Override + public void delete(Quad quad) { + Quad q = harmonizeTripleInQuad(quad); + if (!q.isConcrete()) { + throw new IllegalArgumentException("Concrete quad expected."); + } + Update update = new UpdateDataDelete(new QuadDataAcc(List.of(q))); + execUpdate(update); + } + + @Override + public void deleteAny(Node g, Node s, Node p, Node o) { + deleteAnyInternal(g, s, p, o); + boolean isAnyGraph = g == null || Node.ANY.equals(g); + if (isAnyGraph) { + deleteAnyInternal(Quad.defaultGraphIRI, s, p, o); + } + } + + @Override + public long size() { + long result = fetchLong(this::query, graphsCountQuery, vc); + return result; + } + + @Override + public boolean supportsTransactions() { + return false; + } + + @Override + public void abort() { + getTransactional().abort(); + } + + @Override + public void begin(ReadWrite readWrite) { + getTransactional().begin(readWrite); + } + + @Override + public void commit() { + getTransactional().commit(); + } + + @Override + public void end() { + // Note: AbstractTestRDFConnection.transaction_bad_01() expects Review Comment: The comment means that the JenaTransactionException must be thrown by the Dataset implementation. I think that if transactions were first class citizens (separate objects apart from the Dataset), then this kind of check could be handled in such an transaction instance - perhaps something like a default base `TransactionBase` class. In any case, without this check the `transaction_bad_01` test fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@jena.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@jena.apache.org For additional commands, e-mail: pr-h...@jena.apache.org