IGNITE-4800: Lucene query may fails with NPE. This closes #2315.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/359777f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/359777f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/359777f2

Branch: refs/heads/ignite-5872
Commit: 359777f2355483b3d206b874117d680076a03853
Parents: 48c914d
Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com>
Authored: Mon Jul 31 17:32:12 2017 +0300
Committer: Andrey V. Mashenkov <andrey.mashen...@gmail.com>
Committed: Fri Aug 4 18:06:18 2017 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridLuceneDirectory.java       | 64 +++++++++++---
 .../processors/query/h2/opt/GridLuceneFile.java | 91 +++++++++++++++-----
 .../query/h2/opt/GridLuceneIndex.java           |  3 +-
 .../query/h2/opt/GridLuceneInputStream.java     | 42 ++++++---
 .../query/h2/opt/GridLuceneOutputStream.java    | 18 +++-
 ...teCacheFullTextQueryNodeJoiningSelfTest.java |  3 +-
 .../IgniteCacheQuerySelfTestSuite.java          |  2 +
 7 files changed, 176 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java
index ff20987..3ac9641 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneDirectory.java
@@ -21,22 +21,27 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.lucene.store.BaseDirectory;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.Accountables;
 
 /**
  * A memory-resident {@link Directory} implementation.
  */
-public class GridLuceneDirectory extends BaseDirectory {
+public class GridLuceneDirectory extends BaseDirectory implements Accountable {
     /** */
     protected final Map<String, GridLuceneFile> fileMap = new 
ConcurrentHashMap<>();
 
@@ -51,7 +56,7 @@ public class GridLuceneDirectory extends BaseDirectory {
      *
      * @param mem Memory.
      */
-    public GridLuceneDirectory(GridUnsafeMemory mem) {
+    GridLuceneDirectory(GridUnsafeMemory mem) {
         super(new GridLuceneLockFactory());
 
         this.mem = mem;
@@ -64,10 +69,7 @@ public class GridLuceneDirectory extends BaseDirectory {
         // and the code below is resilient to map changes during the array 
population.
         Set<String> fileNames = fileMap.keySet();
 
-        List<String> names = new ArrayList<>(fileNames.size());
-
-        for (String name : fileNames)
-            names.add(name);
+        List<String> names = new ArrayList<>(fileNames);
 
         return names.toArray(new String[names.size()]);
     }
@@ -82,6 +84,7 @@ public class GridLuceneDirectory extends BaseDirectory {
             throw new FileNotFoundException(source);
 
         fileMap.put(dest, file);
+
         fileMap.remove(source);
     }
 
@@ -101,21 +104,25 @@ public class GridLuceneDirectory extends BaseDirectory {
     @Override public void deleteFile(String name) throws IOException {
         ensureOpen();
 
-        doDeleteFile(name);
+        doDeleteFile(name, false);
     }
 
     /**
      * Deletes file.
      *
      * @param name File name.
+     * @param onClose If on close directory;
      * @throws IOException If failed.
      */
-    private void doDeleteFile(String name) throws IOException {
+    private void doDeleteFile(String name, boolean onClose) throws IOException 
{
         GridLuceneFile file = fileMap.remove(name);
 
         if (file != null) {
             file.delete();
 
+            // All files should be closed when Directory is closing.
+            assert !onClose || !file.hasRefs() : "Possible memory leak, 
resource is not closed: " + file.toString();
+
             sizeInBytes.addAndGet(-file.getSizeInBytes());
         }
         else
@@ -128,7 +135,10 @@ public class GridLuceneDirectory extends BaseDirectory {
 
         GridLuceneFile file = newRAMFile();
 
-        GridLuceneFile existing = fileMap.remove(name);
+        // Lock for using in stream. Will be unlocked on stream closing.
+        file.lockRef();
+
+        GridLuceneFile existing = fileMap.put(name, file);
 
         if (existing != null) {
             sizeInBytes.addAndGet(-existing.getSizeInBytes());
@@ -136,8 +146,6 @@ public class GridLuceneDirectory extends BaseDirectory {
             existing.delete();
         }
 
-        fileMap.put(name, file);
-
         return new GridLuceneOutputStream(file);
     }
 
@@ -165,6 +173,16 @@ public class GridLuceneDirectory extends BaseDirectory {
         if (file == null)
             throw new FileNotFoundException(name);
 
+        // Lock for using in stream. Will be unlocked on stream closing.
+        file.lockRef();
+
+        if (!fileMap.containsKey(name)) {
+            // Unblock for deferred delete.
+            file.releaseRef();
+
+            throw new FileNotFoundException(name);
+        }
+
         return new GridLuceneInputStream(name, file);
     }
 
@@ -172,16 +190,36 @@ public class GridLuceneDirectory extends BaseDirectory {
     @Override public void close() {
         isOpen = false;
 
+        IgniteException errs = null;
+
         for (String fileName : fileMap.keySet()) {
             try {
-                doDeleteFile(fileName);
+                doDeleteFile(fileName, true);
             }
             catch (IOException e) {
-                throw new IllegalStateException(e);
+                if (errs == null)
+                    errs = new IgniteException("Error closing index 
directory.");
+
+                errs.addSuppressed(e);
             }
         }
 
         assert fileMap.isEmpty();
+
+        if (errs != null && !F.isEmpty(errs.getSuppressed()))
+            throw errs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long ramBytesUsed() {
+        ensureOpen();
+
+        return sizeInBytes.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Collection<Accountable> getChildResources() {
+        return Accountables.namedAccountables("file", new HashMap<>(fileMap));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java
index 3985f09..d7ae132 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneFile.java
@@ -17,22 +17,19 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
-import java.io.Serializable;
 import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.lucene.util.Accountable;
 
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridLuceneOutputStream.BUFFER_SIZE;
 
 /**
  * Lucene file.
  */
-public class GridLuceneFile implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    public static final AtomicInteger filesCnt = new AtomicInteger();
-
+public class GridLuceneFile implements Accountable {
     /** */
     private LongArray buffers = new LongArray();
 
@@ -45,6 +42,12 @@ public class GridLuceneFile implements Serializable {
     /** */
     private volatile long sizeInBytes;
 
+    /** */
+    private final AtomicLong refCnt = new AtomicLong();
+
+    /** */
+    private final AtomicBoolean deleted = new AtomicBoolean();
+
     /**
      * File used as buffer, in no RAMDirectory
      *
@@ -52,8 +55,6 @@ public class GridLuceneFile implements Serializable {
      */
     GridLuceneFile(GridLuceneDirectory dir) {
         this.dir = dir;
-
-        filesCnt.incrementAndGet();
     }
 
     /**
@@ -93,51 +94,89 @@ public class GridLuceneFile implements Serializable {
     }
 
     /**
+     * Increment ref counter.
+     */
+    void lockRef() {
+        refCnt.incrementAndGet();
+    }
+
+    /**
+     * Decrement ref counter.
+     */
+    void releaseRef() {
+        refCnt.decrementAndGet();
+
+        deferredDelete();
+    }
+
+    /**
+     * Checks if there is file stream opened.
+     *
+     * @return {@code True} if file has external references.
+     */
+    boolean hasRefs() {
+        long refs = refCnt.get();
+
+        assert refs >= 0;
+
+        return refs != 0;
+    }
+
+    /**
      * Gets address of buffer.
      *
      * @param idx Index.
      * @return Pointer.
      */
-    protected final synchronized long getBuffer(int idx) {
+    final synchronized long getBuffer(int idx) {
         return buffers.get(idx);
     }
 
     /**
      * @return Number of buffers.
      */
-    protected final synchronized int numBuffers() {
+    final synchronized int numBuffers() {
         return buffers.size();
     }
 
     /**
-     * Expert: allocate a new buffer.
-     * Subclasses can allocate differently.
+     * Expert: allocate a new buffer. Subclasses can allocate differently.
      *
      * @return allocated buffer.
      */
-    protected long newBuffer() {
+    private long newBuffer() {
         return dir.memory().allocate(BUFFER_SIZE);
     }
 
     /**
      * Deletes file and deallocates memory..
      */
-    public synchronized void delete() {
-        if (buffers == null)
+    public void delete() {
+        if (!deleted.compareAndSet(false, true))
             return;
 
+        deferredDelete();
+    }
+
+    /**
+     * Deferred delete.
+     */
+    synchronized void deferredDelete() {
+        if (!deleted.get() || hasRefs())
+            return;
+
+        assert refCnt.get() == 0;
+
         for (int i = 0; i < buffers.idx; i++)
             dir.memory().release(buffers.arr[i], BUFFER_SIZE);
 
         buffers = null;
-
-        filesCnt.decrementAndGet();
     }
 
     /**
      * @return Size in bytes.
      */
-    public long getSizeInBytes() {
+    long getSizeInBytes() {
         return sizeInBytes;
     }
 
@@ -148,6 +187,16 @@ public class GridLuceneFile implements Serializable {
         return dir;
     }
 
+    /** {@inheritDoc} */
+    @Override public long ramBytesUsed() {
+        return sizeInBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Accountable> getChildResources() {
+        return Collections.emptyList();
+    }
+
     /**
      * Simple expandable long[] wrapper.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index eed5ee4..c51eb5d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -296,7 +296,8 @@ public class GridLuceneIndex implements AutoCloseable {
     /** {@inheritDoc} */
     @Override public void close() {
         U.closeQuiet(writer);
-        U.closeQuiet(dir);
+
+        dir.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java
index 4820af1..9b1bf0c 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneInputStream.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.h2.opt;
 import java.io.EOFException;
 import java.io.IOException;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.IndexInput;
 
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridLuceneOutputStream.BUFFER_SIZE;
@@ -27,7 +28,7 @@ import static 
org.apache.ignite.internal.processors.query.h2.opt.GridLuceneOutpu
 /**
  * A memory-resident {@link IndexInput} implementation.
  */
-public class GridLuceneInputStream extends IndexInput {
+public class GridLuceneInputStream extends IndexInput implements Cloneable {
     /** */
     private GridLuceneFile file;
 
@@ -52,6 +53,11 @@ public class GridLuceneInputStream extends IndexInput {
     /** */
     private final GridUnsafeMemory mem;
 
+    /** */
+    private volatile boolean closed;
+
+    /** */
+    private boolean isClone;
     /**
      * Constructor.
      *
@@ -91,7 +97,24 @@ public class GridLuceneInputStream extends IndexInput {
 
     /** {@inheritDoc} */
     @Override public void close() {
-        // nothing to do here
+        if (!isClone) {
+            closed = true;
+
+            file.releaseRef();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IndexInput clone() {
+        GridLuceneInputStream clone = (GridLuceneInputStream) super.clone();
+
+        if(closed)
+            throw new AlreadyClosedException(toString());
+
+        clone.isClone = true;
+
+        return clone;
+
     }
 
     /** {@inheritDoc} */
@@ -222,14 +245,16 @@ public class GridLuceneInputStream extends IndexInput {
         public SlicedInputStream(String newResourceDescription, long offset, 
long length) throws IOException {
             super(newResourceDescription, GridLuceneInputStream.this.file, 
offset + length);
 
+            // Avoid parent resource closing together with this.
+            super.isClone = true;
+
             this.offset = offset;
 
             seek(0L);
         }
 
         /** {@inheritDoc} */
-        @Override
-        public void seek(long pos) throws IOException {
+        @Override public void seek(long pos) throws IOException {
             if (pos < 0L) {
                 throw new IllegalArgumentException("Seeking to negative 
position: " + this);
             }
@@ -237,20 +262,17 @@ public class GridLuceneInputStream extends IndexInput {
         }
 
         /** {@inheritDoc} */
-        @Override
-        public long getFilePointer() {
+        @Override public long getFilePointer() {
             return super.getFilePointer() - offset;
         }
 
         /** {@inheritDoc} */
-        @Override
-        public long length() {
+        @Override public long length() {
             return super.length() - offset;
         }
 
         /** {@inheritDoc} */
-        @Override
-        public IndexInput slice(String sliceDescription, long ofs, long len) 
throws IOException {
+        @Override public IndexInput slice(String sliceDescription, long ofs, 
long len) throws IOException {
             return super.slice(sliceDescription, offset + ofs, len);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java
index caea226..d8f09df 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneOutputStream.java
@@ -18,17 +18,21 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.lucene.store.BufferedChecksum;
 import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.Accountables;
 
 /**
  * A memory-resident {@link IndexOutput} implementation.
  */
-public class GridLuceneOutputStream extends IndexOutput {
+public class GridLuceneOutputStream extends IndexOutput implements Accountable 
{
     /** Off-heap page size. */
     static final int BUFFER_SIZE = 32 * 1024;
 
@@ -93,6 +97,8 @@ public class GridLuceneOutputStream extends IndexOutput {
     /** {@inheritDoc} */
     @Override public void close() throws IOException {
         flush();
+
+        file.releaseRef();
     }
 
     /** {@inheritDoc} */
@@ -201,4 +207,14 @@ public class GridLuceneOutputStream extends IndexOutput {
             bufPosition += toCp;
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public long ramBytesUsed() {
+        return file.getSizeInBytes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Accountable> getChildResources() {
+        return Collections.singleton(Accountables.namedAccountable("file", 
file));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java
index ba0324f..162b1e5 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheFullTextQueryNodeJoiningSelfTest.java
@@ -42,7 +42,6 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
- * TODO https://issues.apache.org/jira/browse/IGNITE-2229
  * Tests cache in-place modification logic with iterative value increment.
  */
 public class IgniteCacheFullTextQueryNodeJoiningSelfTest extends 
GridCommonAbstractTest {
@@ -107,6 +106,8 @@ public class IgniteCacheFullTextQueryNodeJoiningSelfTest 
extends GridCommonAbstr
      * @throws Exception If failed.
      */
     public void testFullTextQueryNodeJoin() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-2229";);
+
         for (int r = 0; r < 5; r++) {
             startGrids(GRID_CNT);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/359777f2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 258eed8..1ad0d4b 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCacheCollocatedQuerySel
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheDeleteSqlQuerySelfTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheDuplicateEntityConfigurationSelfTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheFieldsQueryNoDataSelfTest;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheFullTextQueryNodeJoiningSelfTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheInsertSqlQuerySelfTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheJoinPartitionedAndReplicatedTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheJoinQueryWithAffinityKeyTest;
@@ -273,6 +274,7 @@ public class IgniteCacheQuerySelfTestSuite extends 
TestSuite {
 
         // Full text queries.
         suite.addTestSuite(GridCacheFullTextQuerySelfTest.class);
+        suite.addTestSuite(IgniteCacheFullTextQueryNodeJoiningSelfTest.class);
 
         // Ignite cache and H2 comparison.
         suite.addTestSuite(BaseH2CompareQueryTest.class);

Reply via email to