IGNITE-4800: Lucene query may fails with NPE. This closes #2315.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/359777f2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/359777f2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/359777f2 Branch: refs/heads/ignite-5872 Commit: 359777f2355483b3d206b874117d680076a03853 Parents: 48c914d Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Authored: Mon Jul 31 17:32:12 2017 +0300 Committer: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Committed: Fri Aug 4 18:06:18 2017 +0300 ---------------------------------------------------------------------- .../query/h2/opt/GridLuceneDirectory.java | 64 +++++++++++--- .../processors/query/h2/opt/GridLuceneFile.java | 91 +++++++++++++++----- .../query/h2/opt/GridLuceneIndex.java | 3 +- .../query/h2/opt/GridLuceneInputStream.java | 42 ++++++--- .../query/h2/opt/GridLuceneOutputStream.java | 18 +++- ...teCacheFullTextQueryNodeJoiningSelfTest.java | 3 +- .../IgniteCacheQuerySelfTestSuite.java | 2 + 7 files changed, 176 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java index ff20987..3ac9641 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java @@ -21,22 +21,27 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.internal.util.typedef.F; import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.Accountables; /** * A memory-resident {@link Directory} implementation. */ -public class GridLuceneDirectory extends BaseDirectory { +public class GridLuceneDirectory extends BaseDirectory implements Accountable { /** */ protected final Map<String, GridLuceneFile> fileMap = new ConcurrentHashMap<>(); @@ -51,7 +56,7 @@ public class GridLuceneDirectory extends BaseDirectory { * * @param mem Memory. */ - public GridLuceneDirectory(GridUnsafeMemory mem) { + GridLuceneDirectory(GridUnsafeMemory mem) { super(new GridLuceneLockFactory()); this.mem = mem; @@ -64,10 +69,7 @@ public class GridLuceneDirectory extends BaseDirectory { // and the code below is resilient to map changes during the array population. Set<String> fileNames = fileMap.keySet(); - List<String> names = new ArrayList<>(fileNames.size()); - - for (String name : fileNames) - names.add(name); + List<String> names = new ArrayList<>(fileNames); return names.toArray(new String[names.size()]); } @@ -82,6 +84,7 @@ public class GridLuceneDirectory extends BaseDirectory { throw new FileNotFoundException(source); fileMap.put(dest, file); + fileMap.remove(source); } @@ -101,21 +104,25 @@ public class GridLuceneDirectory extends BaseDirectory { @Override public void deleteFile(String name) throws IOException { ensureOpen(); - doDeleteFile(name); + doDeleteFile(name, false); } /** * Deletes file. * * @param name File name. + * @param onClose If on close directory; * @throws IOException If failed. */ - private void doDeleteFile(String name) throws IOException { + private void doDeleteFile(String name, boolean onClose) throws IOException { GridLuceneFile file = fileMap.remove(name); if (file != null) { file.delete(); + // All files should be closed when Directory is closing. + assert !onClose || !file.hasRefs() : "Possible memory leak, resource is not closed: " + file.toString(); + sizeInBytes.addAndGet(-file.getSizeInBytes()); } else @@ -128,7 +135,10 @@ public class GridLuceneDirectory extends BaseDirectory { GridLuceneFile file = newRAMFile(); - GridLuceneFile existing = fileMap.remove(name); + // Lock for using in stream. Will be unlocked on stream closing. + file.lockRef(); + + GridLuceneFile existing = fileMap.put(name, file); if (existing != null) { sizeInBytes.addAndGet(-existing.getSizeInBytes()); @@ -136,8 +146,6 @@ public class GridLuceneDirectory extends BaseDirectory { existing.delete(); } - fileMap.put(name, file); - return new GridLuceneOutputStream(file); } @@ -165,6 +173,16 @@ public class GridLuceneDirectory extends BaseDirectory { if (file == null) throw new FileNotFoundException(name); + // Lock for using in stream. Will be unlocked on stream closing. + file.lockRef(); + + if (!fileMap.containsKey(name)) { + // Unblock for deferred delete. + file.releaseRef(); + + throw new FileNotFoundException(name); + } + return new GridLuceneInputStream(name, file); } @@ -172,16 +190,36 @@ public class GridLuceneDirectory extends BaseDirectory { @Override public void close() { isOpen = false; + IgniteException errs = null; + for (String fileName : fileMap.keySet()) { try { - doDeleteFile(fileName); + doDeleteFile(fileName, true); } catch (IOException e) { - throw new IllegalStateException(e); + if (errs == null) + errs = new IgniteException("Error closing index directory."); + + errs.addSuppressed(e); } } assert fileMap.isEmpty(); + + if (errs != null && !F.isEmpty(errs.getSuppressed())) + throw errs; + } + + /** {@inheritDoc} */ + @Override public long ramBytesUsed() { + ensureOpen(); + + return sizeInBytes.get(); + } + + /** {@inheritDoc} */ + @Override public synchronized Collection<Accountable> getChildResources() { + return Accountables.namedAccountables("file", new HashMap<>(fileMap)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java index 3985f09..d7ae132 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java @@ -17,22 +17,19 @@ package org.apache.ignite.internal.processors.query.h2.opt; -import java.io.Serializable; import java.util.Arrays; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.util.Accountable; import static org.apache.ignite.internal.processors.query.h2.opt.GridLuceneOutputStream.BUFFER_SIZE; /** * Lucene file. */ -public class GridLuceneFile implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - public static final AtomicInteger filesCnt = new AtomicInteger(); - +public class GridLuceneFile implements Accountable { /** */ private LongArray buffers = new LongArray(); @@ -45,6 +42,12 @@ public class GridLuceneFile implements Serializable { /** */ private volatile long sizeInBytes; + /** */ + private final AtomicLong refCnt = new AtomicLong(); + + /** */ + private final AtomicBoolean deleted = new AtomicBoolean(); + /** * File used as buffer, in no RAMDirectory * @@ -52,8 +55,6 @@ public class GridLuceneFile implements Serializable { */ GridLuceneFile(GridLuceneDirectory dir) { this.dir = dir; - - filesCnt.incrementAndGet(); } /** @@ -93,51 +94,89 @@ public class GridLuceneFile implements Serializable { } /** + * Increment ref counter. + */ + void lockRef() { + refCnt.incrementAndGet(); + } + + /** + * Decrement ref counter. + */ + void releaseRef() { + refCnt.decrementAndGet(); + + deferredDelete(); + } + + /** + * Checks if there is file stream opened. + * + * @return {@code True} if file has external references. + */ + boolean hasRefs() { + long refs = refCnt.get(); + + assert refs >= 0; + + return refs != 0; + } + + /** * Gets address of buffer. * * @param idx Index. * @return Pointer. */ - protected final synchronized long getBuffer(int idx) { + final synchronized long getBuffer(int idx) { return buffers.get(idx); } /** * @return Number of buffers. */ - protected final synchronized int numBuffers() { + final synchronized int numBuffers() { return buffers.size(); } /** - * Expert: allocate a new buffer. - * Subclasses can allocate differently. + * Expert: allocate a new buffer. Subclasses can allocate differently. * * @return allocated buffer. */ - protected long newBuffer() { + private long newBuffer() { return dir.memory().allocate(BUFFER_SIZE); } /** * Deletes file and deallocates memory.. */ - public synchronized void delete() { - if (buffers == null) + public void delete() { + if (!deleted.compareAndSet(false, true)) return; + deferredDelete(); + } + + /** + * Deferred delete. + */ + synchronized void deferredDelete() { + if (!deleted.get() || hasRefs()) + return; + + assert refCnt.get() == 0; + for (int i = 0; i < buffers.idx; i++) dir.memory().release(buffers.arr[i], BUFFER_SIZE); buffers = null; - - filesCnt.decrementAndGet(); } /** * @return Size in bytes. */ - public long getSizeInBytes() { + long getSizeInBytes() { return sizeInBytes; } @@ -148,6 +187,16 @@ public class GridLuceneFile implements Serializable { return dir; } + /** {@inheritDoc} */ + @Override public long ramBytesUsed() { + return sizeInBytes; + } + + /** {@inheritDoc} */ + @Override public Collection<Accountable> getChildResources() { + return Collections.emptyList(); + } + /** * Simple expandable long[] wrapper. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java index eed5ee4..c51eb5d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java @@ -296,7 +296,8 @@ public class GridLuceneIndex implements AutoCloseable { /** {@inheritDoc} */ @Override public void close() { U.closeQuiet(writer); - U.closeQuiet(dir); + + dir.close(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java index 4820af1..9b1bf0c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.h2.opt; import java.io.EOFException; import java.io.IOException; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IndexInput; import static org.apache.ignite.internal.processors.query.h2.opt.GridLuceneOutputStream.BUFFER_SIZE; @@ -27,7 +28,7 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridLuceneOutpu /** * A memory-resident {@link IndexInput} implementation. */ -public class GridLuceneInputStream extends IndexInput { +public class GridLuceneInputStream extends IndexInput implements Cloneable { /** */ private GridLuceneFile file; @@ -52,6 +53,11 @@ public class GridLuceneInputStream extends IndexInput { /** */ private final GridUnsafeMemory mem; + /** */ + private volatile boolean closed; + + /** */ + private boolean isClone; /** * Constructor. * @@ -91,7 +97,24 @@ public class GridLuceneInputStream extends IndexInput { /** {@inheritDoc} */ @Override public void close() { - // nothing to do here + if (!isClone) { + closed = true; + + file.releaseRef(); + } + } + + /** {@inheritDoc} */ + @Override public IndexInput clone() { + GridLuceneInputStream clone = (GridLuceneInputStream) super.clone(); + + if(closed) + throw new AlreadyClosedException(toString()); + + clone.isClone = true; + + return clone; + } /** {@inheritDoc} */ @@ -222,14 +245,16 @@ public class GridLuceneInputStream extends IndexInput { public SlicedInputStream(String newResourceDescription, long offset, long length) throws IOException { super(newResourceDescription, GridLuceneInputStream.this.file, offset + length); + // Avoid parent resource closing together with this. + super.isClone = true; + this.offset = offset; seek(0L); } /** {@inheritDoc} */ - @Override - public void seek(long pos) throws IOException { + @Override public void seek(long pos) throws IOException { if (pos < 0L) { throw new IllegalArgumentException("Seeking to negative position: " + this); } @@ -237,20 +262,17 @@ public class GridLuceneInputStream extends IndexInput { } /** {@inheritDoc} */ - @Override - public long getFilePointer() { + @Override public long getFilePointer() { return super.getFilePointer() - offset; } /** {@inheritDoc} */ - @Override - public long length() { + @Override public long length() { return super.length() - offset; } /** {@inheritDoc} */ - @Override - public IndexInput slice(String sliceDescription, long ofs, long len) throws IOException { + @Override public IndexInput slice(String sliceDescription, long ofs, long len) throws IOException { return super.slice(sliceDescription, offset + ofs, len); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java index caea226..d8f09df 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java @@ -18,17 +18,21 @@ package org.apache.ignite.internal.processors.query.h2.opt; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.zip.CRC32; import java.util.zip.Checksum; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.lucene.store.BufferedChecksum; import org.apache.lucene.store.DataInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.Accountables; /** * A memory-resident {@link IndexOutput} implementation. */ -public class GridLuceneOutputStream extends IndexOutput { +public class GridLuceneOutputStream extends IndexOutput implements Accountable { /** Off-heap page size. */ static final int BUFFER_SIZE = 32 * 1024; @@ -93,6 +97,8 @@ public class GridLuceneOutputStream extends IndexOutput { /** {@inheritDoc} */ @Override public void close() throws IOException { flush(); + + file.releaseRef(); } /** {@inheritDoc} */ @@ -201,4 +207,14 @@ public class GridLuceneOutputStream extends IndexOutput { bufPosition += toCp; } } + + /** {@inheritDoc} */ + @Override public long ramBytesUsed() { + return file.getSizeInBytes(); + } + + /** {@inheritDoc} */ + @Override public Collection<Accountable> getChildResources() { + return Collections.singleton(Accountables.namedAccountable("file", file)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java index ba0324f..162b1e5 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java @@ -42,7 +42,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** - * TODO https://issues.apache.org/jira/browse/IGNITE-2229 * Tests cache in-place modification logic with iterative value increment. */ public class IgniteCacheFullTextQueryNodeJoiningSelfTest extends GridCommonAbstractTest { @@ -107,6 +106,8 @@ public class IgniteCacheFullTextQueryNodeJoiningSelfTest extends GridCommonAbstr * @throws Exception If failed. */ public void testFullTextQueryNodeJoin() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-2229"); + for (int r = 0; r < 5; r++) { startGrids(GRID_CNT); http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 258eed8..1ad0d4b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheCollocatedQuerySel import org.apache.ignite.internal.processors.cache.IgniteCacheDeleteSqlQuerySelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheDuplicateEntityConfigurationSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheFieldsQueryNoDataSelfTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheFullTextQueryNodeJoiningSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheInsertSqlQuerySelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheJoinPartitionedAndReplicatedTest; import org.apache.ignite.internal.processors.cache.IgniteCacheJoinQueryWithAffinityKeyTest; @@ -273,6 +274,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { // Full text queries. suite.addTestSuite(GridCacheFullTextQuerySelfTest.class); + suite.addTestSuite(IgniteCacheFullTextQueryNodeJoiningSelfTest.class); // Ignite cache and H2 comparison. suite.addTestSuite(BaseH2CompareQueryTest.class);