Right, I think I cracked it! :-)
Two classes defined below, one extends DatasetGraphInMemory, one provides a
small test (basically a quad echo).
Simple overview;
addToNamedGraph writes the quad into a separate QuadTable if the
transaction is READ otherwise it call super...
findInNamedGraph returns the super find if the transaction is READ
otherwise it returns the union of the super find and the separate
QuadTable.find.
end checks if the transaction is READ and separate quad tables exist and if
they do it begins a WRITE transaction and copies the quads then updates a
global set of cached quads.
I'm sure this upholds read committed for threads holding READ and also for
the thread holding the READ which needs to WRITE because of the union.
Subsequent threads which READ will see the changes of the WRITE after it
has been committed. The delayed WRITE in the end() will proceed as normal
WRITE blocking the READ thread from continuing.
Comments please?
The following class will create the quad it's asked to find the first time
it is asked to find it.
package org.iungo.dataset;
import org.apache.jena.sparql.core.Quad;
public class DatasetGraphEcho extends DatasetGraphOnDemand {
public DatasetGraphEcho() {
onDemand.add(new CacheOnDemand() {
@Override
public void cache(Quad t) {
add(t);
}
});
}
}
By extending this class...
package org.iungo.dataset;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.compose.CompositionBase;
import org.apache.jena.query.ReadWrite;
import org.apache.jena.sparql.core.Quad;
import org.apache.jena.sparql.core.mem.DatasetGraphInMemory;
import org.apache.jena.sparql.core.mem.HexTable;
import org.apache.jena.sparql.core.mem.QuadTable;
import org.apache.jena.util.iterator.WrappedIterator;
public abstract class DatasetGraphOnDemand extends DatasetGraphInMemory {
protected static class DelayedWrite {
protected final QuadTable quadTable = new HexTable();
}
protected final ThreadLocal<Map<Quad, DelayedWrite>> delayedWrites =
ThreadLocal.withInitial(() -> new HashMap<>());
protected static interface OnDemand extends Consumer<Quad> {
}
protected Set<Quad> cached = ConcurrentHashMap.newKeySet();
protected abstract class CacheOnDemand implements OnDemand {
@Override
public void accept(Quad q) {
if (!cached.contains(q)) {
cache(q);
}
}
abstract void cache(Quad q);
}
protected final List<OnDemand> onDemand = new LinkedList<>();
@Override
protected void addToDftGraph(Node s, Node p, Node o) {
throw new UnsupportedOperationException();
}
@Override
protected void addToNamedGraph(Node g, Node s, Node p, Node o) {
if (transactionType().equals(ReadWrite.READ)) {
Map<Quad, DelayedWrite> m = delayedWrites.get();
Quad q = new Quad(g, s, p, o);
DelayedWrite delayedWrite = m.get(q);
if (delayedWrite == null) {
delayedWrite = new DelayedWrite();
m.put(q, delayedWrite);
}
delayedWrite.quadTable.add(q);
} else {
super.addToNamedGraph(g, s, p, o);
}
}
@Override
protected Iterator<Quad> findInSpecificNamedGraph(Node g, Node s, Node p,
Node o) {
final Quad q = new Quad(g, s, p, o);
onDemand.forEach(u -> u.accept(q));
final Iterator<Quad> i = super.findInSpecificNamedGraph(g, s, p, o);
if (transactionType().equals(ReadWrite.READ)) {
DelayedWrite delayedWrite = delayedWrites.get().get(q);
if (delayedWrite == null) {
return i;
} else {
Set<Quad> seen = new HashSet<>();
/*
* Return the read quads and then the delayedWrite quads dropping any which
have already been seen so as to perserve the union contract.
*/
return CompositionBase.recording(WrappedIterator.create(i), seen).andThen(
WrappedIterator.create(delayedWrite.quadTable.find(g, s, p,
o).iterator()).filterDrop( seen::contains ));
}
} else {
return i;
}
}
@Override
public void end() {
final Boolean applyDelayedWrites = transactionType().equals(ReadWrite.READ)
&& delayedWrites.get().size() > 0;
super.end();
if (applyDelayedWrites) {
begin(ReadWrite.WRITE);
try {
Map<Quad, DelayedWrite> m = delayedWrites.get();
m.forEach(new BiConsumer<Quad, DelayedWrite>() {
@Override
public void accept(Quad t, DelayedWrite u) {
u.quadTable.begin(ReadWrite.READ);
u.quadTable.find(Node.ANY, Node.ANY, Node.ANY, Node.ANY).forEach(quad ->
add(quad));
u.quadTable.end();
}
});
commit(); // All the delay writes have now been committed to the dataset
graph.
// Add all the cached quads to the global cache
m.keySet().forEach(q -> cached.add(q));
} finally {
super.end();
}
}
}
}
On 15 March 2016 at 21:50, Andy Seaborne <[email protected]> wrote:
> On 15/03/16 14:12, A. Soroka wrote:
>
>> I guess you could use addGraph to intercept and alter (or substitute) the
>> graph,
>>
>
> Or
> getGraph(graphNode)
>
>
> but that seems like a real distortion of the semantics. Seems like the
>> AFS-Dev material is more to the point here.
>>
>
>
>
> Andy, what do you think it would take to get that stuff to Jena
>> master? Do you think it is ready for that? I would be happy to
>> refactor TIM to use it instead of the stuff it currently uses in
>> o.a.j.sparql.core.mem.
>>
>
> I don't think its ready - it has not been used "in anger" and it may be
> the wrong design. It needs trying out outside the codebase. TIM works as
> it currently so it isn't a rush to put it in there.
>
>
> (digression:...)
>
> I was at a talk recently about high performance java and the issue of
> object churn was mentioned as being quite impactful on the GC as the heap
> size grows. Once a long time ago, object creation was expensive ... then
> CPUS got faster and the java runtime smarter and it was less of an issue
> ... but it seems that its returning as a factor
>
> inline lambdas are apparently faster than the same code with a class
> implementation - the compiler emits an invokedynamic for the lanmbda
>
> and Java Stream can cause a lot of short-lived objects.
>
> Andy
>
>
>
>> ---
>> A. Soroka
>> The University of Virginia Library
>>
>> On Mar 15, 2016, at 7:39 AM, Dick Murray <[email protected]> wrote:
>>>
>>> Eureka moment! It returns a new Graph of a certain type. Whereas I need
>>> the
>>> graph node to determine where the underlying data is.
>>>
>>> Cheers Dick.
>>>
>>> On 15 March 2016 at 11:28, Andy Seaborne <[email protected]> wrote:
>>>
>>> On 15/03/16 10:30, Dick Murray wrote:
>>>>
>>>> Sorry, supportsTransactionAbort() in AFS-Dev
>>>>> <https://github.com/afs/AFS-Dev>/src
>>>>> <https://github.com/afs/AFS-Dev/tree/master/src>/main
>>>>> <https://github.com/afs/AFS-Dev/tree/master/src/main>/java
>>>>> <https://github.com/afs/AFS-Dev/tree/master/src/main/java>/projects
>>>>> <https://github.com/afs/AFS-Dev/tree/master/src/main/java/projects
>>>>> >/dsg2
>>>>> <
>>>>> https://github.com/afs/AFS-Dev/tree/master/src/main/java/projects/dsg2
>>>>> >/
>>>>> *DatasetGraphStorage.java*
>>>>>
>>>>>
>>>> *Experimental code.*
>>>>
>>>>
>>>>
>>>> supportsTransactionAbort is in the DatasetGraph interface in Jena.
>>>>
>>>>
>>>> DatasetGraphStorage is using TransactionalLock.createMRSW
>>>>
>>>> As mentioned, it needs cooperation from the underlying thing to be able
>>>> to
>>>> do aborts and MRSW does not provide that (it's external locking).
>>>>
>>>> DatasetGraphStorage doesn't presume that the storage unit is
>>>> transactional.
>>>>
>>>> After these discussions I've decided to create a DatasetGraphOnDemand
>>>> which
>>>>
>>>>> extends DatasetGraphMap and uses Union graphs.
>>>>>
>>>>> However in DatasetGraphMap shouldn't getGraphCreate() be
>>>>> getGraphCreate(Node graphNode) as otherwise it doesn't know what to
>>>>> create?
>>>>>
>>>>>
>>>> It creates a graph - addGraph(graphNode, g) is managing the naming.
>>>> Grapsh
>>>> don't know the name used (in other places one graph can have many
>>>> names).
>>>>
>>>> DatasetGraphMap is for a collection of independent graphs to be turned
>>>> into a dataset.
>>>>
>>>> Andy
>>>>
>>>>
>>>> @Override
>>>>> public Graph getGraph(Node graphNode)
>>>>> {
>>>>> Graph g = graphs.get(graphNode) ;
>>>>> if ( g == null )
>>>>> {
>>>>> g = getGraphCreate() ;
>>>>> if ( g != null )
>>>>> addGraph(graphNode, g) ;
>>>>> }
>>>>> return g ;
>>>>> }
>>>>>
>>>>> /** Called from getGraph when a nonexistent graph is asked for.
>>>>> * Return null for "nothing created as a graph"
>>>>> */
>>>>> protected Graph getGraphCreate() { return null ; }
>>>>>
>>>>> Dick.
>>>>>
>>>>>
>>>>>
>>>>
>>
>