Yep, that's mangled.
I've refactored the code into a Jena package do you want me to create a patch 
for testing or it can be pulled from my github?


Dick

-------- Original message --------
From: Andy Seaborne <[email protected]> 
Date: 29/03/2016  10:02 am  (GMT+00:00) 
To: [email protected] 
Subject: Re: SPI DatasetGraph creating Triples/Quads on demand using
  DatasetGraphInMemory 

On 16/03/16 20:05, Dick Murray wrote:
> Right, I think I cracked it! :-)
>
> Two classes defined below, one extends DatasetGraphInMemory, one provides a
> small test (basically a quad echo).
>
> Simple overview;
>
> addToNamedGraph writes the quad into a separate QuadTable if the
> transaction is READ otherwise it call super...
>
> findInNamedGraph returns the super find if the transaction is READ
> otherwise it returns the union of the super find and the separate
> QuadTable.find.
>
> end checks if the transaction is READ and separate quad tables exist and if
> they do it begins a WRITE transaction and copies the quads then updates a
> global set of cached quads.
>
> I'm sure this upholds read committed for threads holding READ and also for
> the thread holding the READ which needs to WRITE because of the union.
> Subsequent threads which READ will see the changes of the WRITE after it
> has been committed. The delayed WRITE in the end() will proceed as normal
> WRITE blocking the READ thread from continuing.
>
> Comments please?

Sounds OK.

But the code got mangled :-(

[
Thunderbird-ism? That seems remove indentation on C&P - it's annoying 
and I haven't found a way to stop it
]

        andy

>
> The following class will create the quad it's asked to find the first time
> it is asked to find it.
>
> package org.iungo.dataset;
>
> import org.apache.jena.sparql.core.Quad;
>
> public class DatasetGraphEcho extends DatasetGraphOnDemand {
>
> public DatasetGraphEcho() {
> onDemand.add(new CacheOnDemand() {
> @Override
> public void cache(Quad t) {
> add(t);
> }
> });
> }
> }
>
>
> By extending this class...
>
> package org.iungo.dataset;
>
> import java.util.HashMap;
> import java.util.HashSet;
> import java.util.Iterator;
> import java.util.LinkedList;
> import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.concurrent.ConcurrentHashMap;
> import java.util.function.BiConsumer;
> import java.util.function.Consumer;
>
> import org.apache.jena.graph.Node;
> import org.apache.jena.graph.compose.CompositionBase;
> import org.apache.jena.query.ReadWrite;
> import org.apache.jena.sparql.core.Quad;
> import org.apache.jena.sparql.core.mem.DatasetGraphInMemory;
> import org.apache.jena.sparql.core.mem.HexTable;
> import org.apache.jena.sparql.core.mem.QuadTable;
> import org.apache.jena.util.iterator.WrappedIterator;
>
> public abstract class DatasetGraphOnDemand extends DatasetGraphInMemory {
>
> protected static class DelayedWrite {
> protected final QuadTable quadTable = new HexTable();
> }
> protected final ThreadLocal<Map<Quad, DelayedWrite>> delayedWrites =
> ThreadLocal.withInitial(() -> new HashMap<>());
> protected static interface OnDemand extends Consumer<Quad> {
> }
>
> protected Set<Quad> cached = ConcurrentHashMap.newKeySet();
> protected abstract class CacheOnDemand implements OnDemand {
> @Override
> public void accept(Quad q) {
> if (!cached.contains(q)) {
> cache(q);
> }
> }
> abstract void cache(Quad q);
> }
> protected final List<OnDemand> onDemand = new LinkedList<>();
> @Override
> protected void addToDftGraph(Node s, Node p, Node o) {
> throw new UnsupportedOperationException();
> }
>
> @Override
> protected void addToNamedGraph(Node g, Node s, Node p, Node o) {
> if (transactionType().equals(ReadWrite.READ)) {
> Map<Quad, DelayedWrite> m = delayedWrites.get();
> Quad q = new Quad(g, s, p, o);
> DelayedWrite delayedWrite = m.get(q);
> if (delayedWrite == null) {
> delayedWrite = new DelayedWrite();
> m.put(q, delayedWrite);
> }
> delayedWrite.quadTable.add(q);
> } else {
> super.addToNamedGraph(g, s, p, o);
> }
> }
>
> @Override
> protected Iterator<Quad> findInSpecificNamedGraph(Node g, Node s, Node p,
> Node o) {
> final Quad q = new Quad(g, s, p, o);
> onDemand.forEach(u -> u.accept(q));
> final Iterator<Quad> i = super.findInSpecificNamedGraph(g, s, p, o);
> if (transactionType().equals(ReadWrite.READ)) {
> DelayedWrite delayedWrite = delayedWrites.get().get(q);
> if (delayedWrite == null) {
> return i;
> } else {
> Set<Quad> seen = new HashSet<>();
> /*
> * Return the read quads and then the delayedWrite quads dropping any which
> have already been seen so as to perserve the union contract.
> */
> return CompositionBase.recording(WrappedIterator.create(i), seen).andThen(
> WrappedIterator.create(delayedWrite.quadTable.find(g, s, p,
> o).iterator()).filterDrop( seen::contains ));
> }
> } else {
> return i;
> }
> }
>
> @Override
> public void end() {
> final Boolean applyDelayedWrites = transactionType().equals(ReadWrite.READ)
> && delayedWrites.get().size() > 0;
> super.end();
> if (applyDelayedWrites) {
> begin(ReadWrite.WRITE);
> try {
> Map<Quad, DelayedWrite> m = delayedWrites.get();
> m.forEach(new BiConsumer<Quad, DelayedWrite>() {
> @Override
> public void accept(Quad t, DelayedWrite u) {
> u.quadTable.begin(ReadWrite.READ);
> u.quadTable.find(Node.ANY, Node.ANY, Node.ANY, Node.ANY).forEach(quad ->
> add(quad));
> u.quadTable.end();
> }
> });
> commit(); // All the delay writes have now been committed to the dataset
> graph.
> // Add all the cached quads to the global cache
> m.keySet().forEach(q -> cached.add(q));
> } finally {
> super.end();
> }
> }
> }
> }

Reply via email to