I don't think my solution is a solution that could be used in production.
Anyway, the patched file is in attachement.

D.

On 27/03/2016 13:56, Andy Seaborne wrote:
On 19/03/16 13:35, Dominique Vandensteen wrote:
I don't think having enough memory is a working solution because we will
need the big amount of memory only on rare occasions, so most of the
time the memory will be "wasted".

During my investigation I came up with 2 causes of the problem:
1. The close method of
org.apache.jena.tdb.base.file.BufferAllocatorMapped is never called.
I quickly fixed this by adding a ThreadLocal which is used to close all
instances at transaction end. I will clean this up and use a
WeakReference which in my opinion is a cleaner solution.

2. An issue in the JVM that is described here:
http://stackoverflow.com/questions/13065358/java-7-filechannel-not-closing-properly-after-calling-a-map-method#32062298


By implementing these 2 fixes I was able to use the
arq:spillToDiskThreshold option in windows.

Great - do you have a patch or pull request for that?

    Andy


Dominique

On 18/03/2016 22:27, Stephen Allen wrote:
On Fri, Mar 18, 2016 at 2:20 PM, Andy Seaborne <[email protected]> wrote:

On 18/03/16 09:16, Dominique Vandensteen wrote:

Hi,
I'm having problems handling "big" graphs (50M to 100M triples at
current
stage) in my fuseki servers using sparql.
The 2 actions I need todo are "DROP GRAPH <...>" and "MOVE <...> TO
<...>".
Doing these action with these graphs I get OutOfMemory errors. Some
investigation pionted me to
http://markmail.org/message/hjisrglx4eicrxyt
and

http://mail-archives.apache.org/mod_mbox/jena-users/201504.mbox/%3ccaj+mtwad1vfcnjaro37xkiwgyj7mrnillzvmsx1_nrj+rrf...@mail.gmail.com%3E


Using this config:
<#yourdatasetname> rdf:type tdb:DatasetTDB ;
     ja:context [ ja:cxtName "tdb:transactionJournalWriteBlockMode" ;
ja:cxtValue "mapped" ] ;
     ja:context [ ja:cxtName "arq:spillToDiskThreshold" ; ja:cxtValue
10000 .
] .
Solves my problem but brings up another problem. My temp folder gets
filled
up with JenaTempByteBuffer-...UUID...tmp files until my disk is full.
These
files remain locked so I cannot delete them.
The files seem to be created
by org.apache.jena.tdb.base.file.BufferAllocatorMapped but are for some
reason not released.
Is there any way to work around this issue?

I'm using
-fuseki 2.3.1
-jvm 1.8.0_25 64bit
-windows 10

mapped + Windows => files don't go away until the JVM exits [1] and even
then it does not seem to be reliable according to some reports.

I thought BufferAllocatorDirect was supposed to get round this but it
allocates on direct memory (AKA malloc).

It would need a spill to plain file implementation of BufferAllocator
which we don't seem to have.

         Andy

[1]
http://bugs.java.com/view_bug.do?bug_id=4724038
and others.


You can use the off-JVM memory that Andy mentions by changing the
"mapped"
to "direct" in your config file.  That is similar to using a memory
mapped
file, except that you are limited by the amount of memory that you have
(but if you have enough virtual memory, then there should be no problem).

That first setting is only for TDB's storage of unwritten blocks.  But
when
you do large updates, Jena needs to temporarily store all of the tuples
generated by the WHERE clause in memory before applying them in the
update.
This is where the spillToDisk comes in, it serializes those temporary
tuples on disk in a regular file instead of holding them in an in-memory
array.  That file is not memory mapped, so there should be no problem
with
removing it after the update is complete.

So basically, if "direct" works for you, then go with that (or use a
different OS like Linux for the memory mapped approach).

-Stephen






//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.jena.tdb.base.file;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.ch.DirectBuffer;

import java.io.File;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

public final class BufferAllocatorMapped implements BufferAllocator {
  private static final Logger log = 
LoggerFactory.getLogger(BufferAllocatorMapped.class);

  private static final List<SelfReference> selfReferences = 
Collections.synchronizedList(new ArrayList<>());

  static {
    System.out.println("Using LNE-patched BufferAllocatorMapped with 
selfReferences");
    Thread thread = new Thread(() -> {
      while(true) {
        try {
          for (int i = 0; i < selfReferences.size(); i++) {
            SelfReference selfReference = selfReferences.get(i);
            if(selfReference.selfReference.get() != null) continue;
            selfReference.info.close();
            selfReferences.remove(i);
            i--;
          }
          try {
            Thread.sleep(5000L);
          }
          catch (InterruptedException ignore) {
          }
        }
        catch (Throwable e) {
          log.error("CleanBufferAllocatorMapped ERROR", e);
        }
      }


    }, "CleanBufferAllocatorMapped");
    thread.setDaemon(true);
    thread.start();
  }

  private static class SelfReference {
    private final WeakReference<BufferAllocatorMapped> selfReference;
    private final Info info;

    public SelfReference(BufferAllocatorMapped bufferAllocatorMapped) {
      this.info = bufferAllocatorMapped.info;
      selfReference = new WeakReference<>(bufferAllocatorMapped);
    }
  }

  private static class Info {
    private final List<MappedByteBuffer> segments;
    private final File tmpFile;
    private FileBase file;

    public Info(List<MappedByteBuffer> segments, File tmpFile) {
      this.segments = segments;
      this.tmpFile = tmpFile;
    }

    public void close() {
      this.segments.forEach(segment -> {
        if (segment instanceof DirectBuffer) {
          ((DirectBuffer) segment).cleaner().clean();
        }
      });
      this.segments.clear();
      if (null != this.file) this.file.close();
      this.file = null;
      this.tmpFile.delete();
    }
  }

  private final int segmentSize = 8388608;
  private final int blockSize;
  private final int blocksPerSegment;
  private final Info info;
  private int seq = 0;

//  private static final ThreadLocal<List<BufferAllocatorMapped>> 
bufferAllocatorMappeds = new ThreadLocal<>();

//  public static void cleanForThread() {
//    List<BufferAllocatorMapped> bufferAllocatorMappeds = 
BufferAllocatorMapped.bufferAllocatorMappeds.get();
//    if (null == bufferAllocatorMappeds) return;
//    bufferAllocatorMappeds.stream().forEach(BufferAllocatorMapped::close);
//    BufferAllocatorMapped.bufferAllocatorMappeds.set(null);
//  }

  public BufferAllocatorMapped(int blockSize) {
    if (blockSize != 0 && blockSize <= 8388608) {
      if (8388608 % blockSize != 0) {
        throw new 
IllegalArgumentException(String.format("BufferAllocatorMapped: Segement 
size(%d) not a multiple of blocksize (%d)", new 
Object[]{Integer.valueOf(8388608), Integer.valueOf(blockSize)}));
      }
      else {
        this.blockSize = blockSize;
        this.blocksPerSegment = 8388608 / blockSize;
        info = new Info(new ArrayList<>(), this.getNewTemporaryFile());
//        this.segments = new ArrayList();
//        this.tmpFile = this.getNewTemporaryFile();
//        this.tmpFile.deleteOnExit();
        info.tmpFile.deleteOnExit();
      }
//      List<BufferAllocatorMapped> bufferAllocatorMappeds = 
BufferAllocatorMapped.bufferAllocatorMappeds.get();
//      if (null == bufferAllocatorMappeds) {
//        bufferAllocatorMappeds = new ArrayList<>();
//        
BufferAllocatorMapped.bufferAllocatorMappeds.set(bufferAllocatorMappeds);
//      }
//      bufferAllocatorMappeds.add(this);
      selfReferences.add(new SelfReference(this));
    }
    else {
      throw new IllegalArgumentException("Illegal block size: " + blockSize);
    }
  }

  private final File getNewTemporaryFile() {
    File sysTempDir = new File(System.getProperty("java.io.tmpdir"));
    File tmpFile = new File(sysTempDir, "JenaTempByteBuffer-" + 
UUID.randomUUID().toString() + ".tmp");
    return tmpFile;
  }

  private final int segment(int id) {
    return id / this.blocksPerSegment;
  }

  private final int byteOffset(int id) {
    return id % this.blocksPerSegment * this.blockSize;
  }

  private final long fileLocation(long segmentNumber) {
    return segmentNumber * 8388608L;
  }

  public ByteBuffer allocate(int blkSize) {
    if (blkSize != this.blockSize) {
      throw new FileException("Fixed blocksize only: request= " + blkSize + 
"fixed size=" + this.blockSize);
    }
    else {
      if (null == info.file) {
        info.file = FileBase.create(info.tmpFile.getPath());
      }

      int id = this.seq++;
      int seg = this.segment(id);
      int segOff = this.byteOffset(id);
      MappedByteBuffer segBuffer;
      if (seg >= info.segments.size()) {
        try {
          long toReturn = this.fileLocation((long) seg);
          segBuffer = info.file.channel().map(MapMode.READ_WRITE, toReturn, 
8388608L);
          info.segments.add(segBuffer);
        }
        catch (IOException var8) {
          throw new FileException("MappedFile.allocate: Segment= " + seg, var8);
        }
      }
      else {
        segBuffer = (MappedByteBuffer) info.segments.get(seg);
      }

      segBuffer.position(segOff);
      segBuffer.limit(segOff + this.blockSize);
      ByteBuffer toReturn1 = segBuffer.slice();
      segBuffer.limit(segBuffer.capacity());
      return toReturn1;
    }
  }

  public void clear() {
    this.seq = 0;
  }

  public void close() {
    this.clear();
    info.close();
  }
}

Reply via email to