On 16/03/16 20:05, Dick Murray wrote:
Right, I think I cracked it! :-)
Two classes defined below, one extends DatasetGraphInMemory, one provides a
small test (basically a quad echo).
Simple overview;
addToNamedGraph writes the quad into a separate QuadTable if the
transaction is READ otherwise it call super...
findInNamedGraph returns the super find if the transaction is READ
otherwise it returns the union of the super find and the separate
QuadTable.find.
end checks if the transaction is READ and separate quad tables exist and if
they do it begins a WRITE transaction and copies the quads then updates a
global set of cached quads.
I'm sure this upholds read committed for threads holding READ and also for
the thread holding the READ which needs to WRITE because of the union.
Subsequent threads which READ will see the changes of the WRITE after it
has been committed. The delayed WRITE in the end() will proceed as normal
WRITE blocking the READ thread from continuing.
Comments please?
Sounds OK.
But the code got mangled :-(
[
Thunderbird-ism? That seems remove indentation on C&P - it's annoying
and I haven't found a way to stop it
]
andy
The following class will create the quad it's asked to find the first time
it is asked to find it.
package org.iungo.dataset;
import org.apache.jena.sparql.core.Quad;
public class DatasetGraphEcho extends DatasetGraphOnDemand {
public DatasetGraphEcho() {
onDemand.add(new CacheOnDemand() {
@Override
public void cache(Quad t) {
add(t);
}
});
}
}
By extending this class...
package org.iungo.dataset;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.compose.CompositionBase;
import org.apache.jena.query.ReadWrite;
import org.apache.jena.sparql.core.Quad;
import org.apache.jena.sparql.core.mem.DatasetGraphInMemory;
import org.apache.jena.sparql.core.mem.HexTable;
import org.apache.jena.sparql.core.mem.QuadTable;
import org.apache.jena.util.iterator.WrappedIterator;
public abstract class DatasetGraphOnDemand extends DatasetGraphInMemory {
protected static class DelayedWrite {
protected final QuadTable quadTable = new HexTable();
}
protected final ThreadLocal<Map<Quad, DelayedWrite>> delayedWrites =
ThreadLocal.withInitial(() -> new HashMap<>());
protected static interface OnDemand extends Consumer<Quad> {
}
protected Set<Quad> cached = ConcurrentHashMap.newKeySet();
protected abstract class CacheOnDemand implements OnDemand {
@Override
public void accept(Quad q) {
if (!cached.contains(q)) {
cache(q);
}
}
abstract void cache(Quad q);
}
protected final List<OnDemand> onDemand = new LinkedList<>();
@Override
protected void addToDftGraph(Node s, Node p, Node o) {
throw new UnsupportedOperationException();
}
@Override
protected void addToNamedGraph(Node g, Node s, Node p, Node o) {
if (transactionType().equals(ReadWrite.READ)) {
Map<Quad, DelayedWrite> m = delayedWrites.get();
Quad q = new Quad(g, s, p, o);
DelayedWrite delayedWrite = m.get(q);
if (delayedWrite == null) {
delayedWrite = new DelayedWrite();
m.put(q, delayedWrite);
}
delayedWrite.quadTable.add(q);
} else {
super.addToNamedGraph(g, s, p, o);
}
}
@Override
protected Iterator<Quad> findInSpecificNamedGraph(Node g, Node s, Node p,
Node o) {
final Quad q = new Quad(g, s, p, o);
onDemand.forEach(u -> u.accept(q));
final Iterator<Quad> i = super.findInSpecificNamedGraph(g, s, p, o);
if (transactionType().equals(ReadWrite.READ)) {
DelayedWrite delayedWrite = delayedWrites.get().get(q);
if (delayedWrite == null) {
return i;
} else {
Set<Quad> seen = new HashSet<>();
/*
* Return the read quads and then the delayedWrite quads dropping any which
have already been seen so as to perserve the union contract.
*/
return CompositionBase.recording(WrappedIterator.create(i), seen).andThen(
WrappedIterator.create(delayedWrite.quadTable.find(g, s, p,
o).iterator()).filterDrop( seen::contains ));
}
} else {
return i;
}
}
@Override
public void end() {
final Boolean applyDelayedWrites = transactionType().equals(ReadWrite.READ)
&& delayedWrites.get().size() > 0;
super.end();
if (applyDelayedWrites) {
begin(ReadWrite.WRITE);
try {
Map<Quad, DelayedWrite> m = delayedWrites.get();
m.forEach(new BiConsumer<Quad, DelayedWrite>() {
@Override
public void accept(Quad t, DelayedWrite u) {
u.quadTable.begin(ReadWrite.READ);
u.quadTable.find(Node.ANY, Node.ANY, Node.ANY, Node.ANY).forEach(quad ->
add(quad));
u.quadTable.end();
}
});
commit(); // All the delay writes have now been committed to the dataset
graph.
// Add all the cached quads to the global cache
m.keySet().forEach(q -> cached.add(q));
} finally {
super.end();
}
}
}
}