This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch gg-21368
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 00e531cec9d41cae7ef267a32768e5820501e884
Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com>
AuthorDate: Wed Jul 17 11:09:10 2019 +0300

    GG-20750: SQL: Fix performance drop with enabling global SQL memory quota.
---
 .../processors/query/h2/H2MemoryTracker.java       |   3 +-
 .../processors/query/h2/QueryMemoryManager.java    |  72 +++++++++----
 .../processors/query/h2/QueryMemoryTracker.java    | 113 +++++++++++----------
 .../oom/AbstractQueryMemoryTrackerSelfTest.java    |  27 ++++-
 ...yMemoryTrackerWithQueryParallelismSelfTest.java |  36 +++++++
 .../query/oom/QueryMemoryTrackerSelfTest.java      |  11 ++
 6 files changed, 183 insertions(+), 79 deletions(-)

diff --git 
a/modules/h2/src/main/java/org/apache/ignite/internal/processors/query/h2/H2MemoryTracker.java
 
b/modules/h2/src/main/java/org/apache/ignite/internal/processors/query/h2/H2MemoryTracker.java
index e1d3aac..bbcc5f4 100644
--- 
a/modules/h2/src/main/java/org/apache/ignite/internal/processors/query/h2/H2MemoryTracker.java
+++ 
b/modules/h2/src/main/java/org/apache/ignite/internal/processors/query/h2/H2MemoryTracker.java
@@ -21,6 +21,8 @@ package org.apache.ignite.internal.processors.query.h2;
  */
 public abstract class H2MemoryTracker implements AutoCloseable {
     /**
+     * Reserve memory.
+     *
      * @param size Memory to reserve in bytes.
      */
     public abstract void reserve(long size);
@@ -30,6 +32,5 @@ public abstract class H2MemoryTracker implements 
AutoCloseable {
      *
      * @param size Memory to release in bytes.
      */
-
     public abstract void release(long size);
 }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryMemoryManager.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryMemoryManager.java
index 58ae9e2..727ed35 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryMemoryManager.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryMemoryManager.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.internal.processors.query.h2;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongBinaryOperator;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -37,6 +38,19 @@ public class QueryMemoryManager extends H2MemoryTracker {
      */
     private static final long DFLT_MEMORY_RESERVATION_BLOCK_SIZE = 512 * KB;
 
+    /** */
+    static final LongBinaryOperator RELEASE_OP = new LongBinaryOperator() {
+        @Override public long applyAsLong(long prev, long x) {
+            long res = prev - x;
+
+            if (res < 0)
+                throw new IllegalStateException("Try to free more memory that 
ever be reserved: [" +
+                    "reserved=" + prev + ", toFree=" + x + ']');
+
+            return res;
+        }
+    };
+
     /**
      * Default query memory limit.
      *
@@ -48,6 +62,9 @@ public class QueryMemoryManager extends H2MemoryTracker {
     /** Logger. */
     private final IgniteLogger log;
 
+    /** */
+    private final LongBinaryOperator reserveOp;
+
     /** Global query memory quota. */
     //TODO GG-18629: it looks safe to make this configurable at runtime.
     private final long globalQuota;
@@ -74,48 +91,38 @@ public class QueryMemoryManager extends H2MemoryTracker {
                 (long)(Runtime.getRuntime().maxMemory() * 0.6d));
         }
 
-        long dfltSqlQryMemoryLimit = 
Long.getLong(IgniteSystemProperties.IGNITE_DEFAULT_SQL_QUERY_MEMORY_LIMIT, 0);
+        long dfltMemLimit = 
Long.getLong(IgniteSystemProperties.IGNITE_DEFAULT_SQL_QUERY_MEMORY_LIMIT, 0);
 
-        if (dfltSqlQryMemoryLimit == 0)
-            dfltSqlQryMemoryLimit = globalQuota > 0 ? globalQuota / 
IgniteConfiguration.DFLT_QUERY_THREAD_POOL_SIZE : -1;
+        if (dfltMemLimit == 0)
+            dfltMemLimit = globalQuota > 0 ? globalQuota / 
IgniteConfiguration.DFLT_QUERY_THREAD_POOL_SIZE : -1;
 
         this.blockSize = 
Long.getLong(IgniteSystemProperties.IGNITE_SQL_MEMORY_RESERVATION_BLOCK_SIZE, 
DFLT_MEMORY_RESERVATION_BLOCK_SIZE);
         this.globalQuota = globalQuota;
-        this.dfltSqlQryMemoryLimit = dfltSqlQryMemoryLimit;
+        this.dfltSqlQryMemoryLimit = dfltMemLimit;
+
+        this.reserveOp = new ReservationOp(globalQuota);
 
         this.log = ctx.log(QueryMemoryManager.class);
     }
 
     /** {@inheritDoc} */
     @Override public void reserve(long size) {
-        assert size >= 0;
-
         if (size == 0)
             return; // Nothing to do.
 
-        reserved.accumulateAndGet(size, (prev, x) -> {
-            if (prev + x > globalQuota)
-                throw new IgniteSQLException("SQL query run out of memory: 
Global quota exceeded.",
-                    IgniteQueryErrorCode.QUERY_OUT_OF_MEMORY);
+        assert size > 0;
 
-            return prev + x;
-        });
+        reserved.accumulateAndGet(size, reserveOp);
     }
 
     /** {@inheritDoc} */
     @Override public void release(long size) {
-        assert size >= 0;
-
         if (size == 0)
             return; // Nothing to do.
 
-        reserved.accumulateAndGet(-size, (prev, x) -> {
-            if (prev + x < 0)
-                throw new IllegalStateException("Try to free more memory that 
ever be reserved: [" +
-                    "reserved=" + prev + ", toFree=" + x + ']');
+        assert size > 0;
 
-            return prev + x;
-        });
+        reserved.accumulateAndGet(size, RELEASE_OP);
     }
 
     /**
@@ -172,4 +179,29 @@ public class QueryMemoryManager extends H2MemoryTracker {
         if (log.isDebugEnabled() && reserved.get() != 0)
             log.debug("Potential memory leak in SQL processor. Some query 
cursors were not closed or forget to free memory.");
     }
+
+    /** */
+    private static class ReservationOp implements LongBinaryOperator {
+        /** Operation result high bound.*/
+        private final long limit;
+
+        /**
+         * Constructor.
+         * @param limit Operation result high bound.
+         */
+        ReservationOp(long limit) {
+            this.limit = limit;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long applyAsLong(long prev, long x) {
+            long res = prev + x;
+
+            if (res > limit)
+                throw new IgniteSQLException("SQL query run out of memory: 
Global quota exceeded.",
+                    IgniteQueryErrorCode.QUERY_OUT_OF_MEMORY);
+
+            return res;
+        }
+    }
 }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryMemoryTracker.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryMemoryTracker.java
index d0ff5c2..ea31930 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryMemoryTracker.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryMemoryTracker.java
@@ -16,8 +16,6 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -28,14 +26,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  * Track query memory usage and throws an exception if query tries to allocate 
memory over limit.
  */
 public class QueryMemoryTracker extends H2MemoryTracker implements 
AutoCloseable {
-    /** Resered field updater. */
-    private static final AtomicLongFieldUpdater<QueryMemoryTracker> 
RESERVED_UPD =
-        AtomicLongFieldUpdater.newUpdater(QueryMemoryTracker.class, 
"reserved");
-
-    /** Closed flag updater. */
-    private static final AtomicReferenceFieldUpdater<QueryMemoryTracker, 
Boolean> CLOSED_UPD =
-        AtomicReferenceFieldUpdater.newUpdater(QueryMemoryTracker.class, 
Boolean.class, "closed");
-
     /** Parent tracker. */
     private final H2MemoryTracker parent;
 
@@ -46,13 +36,13 @@ public class QueryMemoryTracker extends H2MemoryTracker 
implements AutoCloseable
     private final long blockSize;
 
     /** Memory reserved on parent. */
-    private volatile long reservedFromParent;
+    private long reservedFromParent;
 
     /** Memory reserved by query. */
-    private volatile long reserved;
+    private long reserved;
 
     /** Close flag to prevent tracker reuse. */
-    private volatile Boolean closed = Boolean.FALSE;
+    private Boolean closed = Boolean.FALSE;
 
     /**
      * Constructor.
@@ -71,40 +61,31 @@ public class QueryMemoryTracker extends H2MemoryTracker 
implements AutoCloseable
 
     /** {@inheritDoc} */
     @Override public void reserve(long size) {
-        assert !closed && size >= 0;
-
         if (size == 0)
             return;
 
-        long reserved0 = RESERVED_UPD.accumulateAndGet(this, size, (prev, x) 
-> {
-            if (prev + x > maxMem) {
-                throw new IgniteSQLException("SQL query run out of memory: 
Query quota exceeded.",
-                    IgniteQueryErrorCode.QUERY_OUT_OF_MEMORY);
-            }
+        assert size > 0;
 
-            return prev + x;
-        });
+        synchronized (this) {
+            if (closed)
+                throw new IllegalStateException("Memory tracker has been 
closed concurrently.");
 
-        if (parent != null && reserved0 > reservedFromParent) {
-            synchronized (this) {
-                assert !closed;
-
-                if (reserved0 <= reservedFromParent)
-                    return;
-
-                // If single block size is too small.
-                long blockSize = Math.max(reserved0 - reservedFromParent, 
this.blockSize);
-                // If we are too close to limit.
-                blockSize = Math.min(blockSize, maxMem - reservedFromParent);
+            long reserved0 = reserve0(size);
 
+            if (parent != null && reserved0 > reservedFromParent) {
                 try {
+                    // If single block size is too small.
+                    long blockSize = Math.max(reserved0 - reservedFromParent, 
this.blockSize);
+                    // If we are too close to limit.
+                    blockSize = Math.min(blockSize, maxMem - 
reservedFromParent);
+
                     parent.reserve(blockSize);
 
                     reservedFromParent += blockSize;
                 }
                 catch (Throwable e) {
                     // Fallback if failed to reserve.
-                    RESERVED_UPD.addAndGet(this, -size);
+                    release0(size);
 
                     throw e;
                 }
@@ -114,31 +95,50 @@ public class QueryMemoryTracker extends H2MemoryTracker 
implements AutoCloseable
 
     /** {@inheritDoc} */
     @Override public void release(long size) {
-        assert size >= 0;
-
         if (size == 0)
             return;
 
-        long reserved = RESERVED_UPD.accumulateAndGet(this, -size, (prev, x) 
-> {
-            if (prev + x < 0)
-                throw new IllegalStateException("Try to release more memory 
that were reserved: [" +
-                    "reserved=" + prev + ", toRelease=" + x + ']');
+        assert size > 0;
 
-            return prev + x;
-        });
+        synchronized (this) {
+            if (closed)
+                throw new IllegalStateException("Memory tracker has been 
closed concurrently.");
 
-        assert !closed && reserved >= 0 || reserved == 0 : "Invalid reserved 
memory size:" + reserved;
+            long reserved = release0(size);
 
-        // For now, won'tQ release memory to parent until tracker closed.
-       /* if (parent != null && preAllocated - reserved >= 2 * blockSize) {
-            synchronized (this) {
-                if (preAllocated - reserved >= 2 * blockSize) {
-                    parent.release(blockSize);
+            assert reserved >= 0;
+        }
+    }
 
-                    preAllocated -= blockSize;
-                }
-            }
-        }*/
+    /**
+     * @param size Memory to reserve in bytes.
+     * @return Reserved memory after release.
+     */
+    private long reserve0(long size) {
+        long res = reserved + size;
+
+        if (res > maxMem)
+            throw new IgniteSQLException("SQL query run out of memory: Query 
quota exceeded.",
+                IgniteQueryErrorCode.QUERY_OUT_OF_MEMORY);
+
+        return reserved = res;
+    }
+
+    /**
+     * Release reserved memory.
+     *
+     * @param size Memory to release in bytes.
+     * @return Reserved memory after release.
+     */
+    private long release0(long size) {
+
+        long res = reserved - size;
+
+        if (res < 0)
+            throw new IllegalStateException("Try to free more memory that ever 
be reserved: [" +
+                "reserved=" + reserved + ", toFree=" + size + ']');
+
+        return reserved = res;
     }
 
     /**
@@ -152,8 +152,13 @@ public class QueryMemoryTracker extends H2MemoryTracker 
implements AutoCloseable
     @Override public void close() {
         // It is not expected to be called concurrently with reserve\release.
         // But query can be cancelled concurrently on query finish.
-        if (CLOSED_UPD.compareAndSet(this, Boolean.FALSE, Boolean.TRUE)) {
-            release(RESERVED_UPD.get(this));
+        synchronized (this) {
+            if (closed)
+                return;
+
+            closed = true;
+
+            release0(reserved);
 
             if (parent != null)
                 parent.release(reservedFromParent);
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryMemoryTrackerSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryMemoryTrackerSelfTest.java
index c77ed15..2fc6231 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryMemoryTrackerSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/AbstractQueryMemoryTrackerSelfTest.java
@@ -44,6 +44,7 @@ import org.h2.engine.Session;
 import org.h2.expression.Expression;
 import org.h2.result.H2BaseLocalResult;
 import org.h2.result.LocalResult;
+import org.junit.Ignore;
 import org.junit.Test;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
@@ -371,9 +372,10 @@ public abstract class AbstractQueryMemoryTrackerSelfTest 
extends GridCommonAbstr
 
     /** Check GROUP BY operation on indexed col. */
     @Test
+    @Ignore("https://ggsystems.atlassian.net/browse/GG-19071";)
     public void testQueryWithGroupByPrimaryKey() throws Exception {
-        //TODO: GG-19071: make next test pass without hint.
-        execQuery("select K.indexed, sum(K.id) from K USE INDEX (K_IDX) GROUP 
BY K.indexed", true);
+        //TODO: GG-19071: make next query use correct index (K_IDX instead of 
primary).
+        execQuery("select K.indexed, sum(K.id) from K GROUP BY K.indexed", 
true);
 
         assertEquals(0, localResults.size());
     }
@@ -396,7 +398,7 @@ public abstract class AbstractQueryMemoryTrackerSelfTest 
extends GridCommonAbstr
 
         // Local result is quite small.
         assertEquals(1, localResults.size());
-        assertTrue(maxMem > localResults.get(0).memoryReserved());
+        assertTrue(maxMem > localResults.get(0).memoryReserved() + 1000);
         assertTrue(BIG_TABLE_SIZE > localResults.get(0).getRowCount());
     }
 
@@ -407,7 +409,7 @@ public abstract class AbstractQueryMemoryTrackerSelfTest 
extends GridCommonAbstr
 
         // Local result is quite small.
         assertEquals(1, localResults.size());
-        assertTrue(maxMem > localResults.get(0).memoryReserved());
+        assertTrue(maxMem > localResults.get(0).memoryReserved() + 1000);
         assertTrue(100 > localResults.get(0).getRowCount());
     }
 
@@ -428,6 +430,7 @@ public abstract class AbstractQueryMemoryTrackerSelfTest 
extends GridCommonAbstr
             "GROUP BY K.indexed ORDER BY a DESC", true);
 
         assertEquals(1, localResults.size());
+        assertTrue(maxMem < localResults.get(0).memoryReserved() + 1000);
         assertTrue(BIG_TABLE_SIZE > localResults.get(0).getRowCount());
     }
 
@@ -447,6 +450,9 @@ public abstract class AbstractQueryMemoryTrackerSelfTest 
extends GridCommonAbstr
     public void testQueryWithDistinctAndLowCardinality() throws Exception {
         // Distinct on indexed column with small cardinality.
         execQuery("select DISTINCT K.grp_indexed from K", false);
+
+        assertEquals(1, localResults.size());
+        assertEquals(100, localResults.get(0).getRowCount());
     }
 
     /** Check query failure with DISTINCT constraint. */
@@ -454,6 +460,9 @@ public abstract class AbstractQueryMemoryTrackerSelfTest 
extends GridCommonAbstr
     public void testQueryWithDistinctAndHighCardinality() throws Exception {
         // Distinct on indexed column with unique values.
         checkQueryExpectOOM("select DISTINCT K.id from K", true);
+
+        assertEquals(1, localResults.size());
+        assertTrue(BIG_TABLE_SIZE > localResults.get(0).getRowCount());
     }
 
     /** Check HashJoin with large table. */
@@ -473,6 +482,16 @@ public abstract class AbstractQueryMemoryTrackerSelfTest 
extends GridCommonAbstr
         }
     }
 
+    /** Check Join with large table. */
+    @Test
+    public void testJoinWithLargeTable() throws Exception {
+        maxMem = 512 * KB;
+
+        execQuery("select * from T, K where T.id = K.grp_indexed", true);
+
+        assertEquals(0, localResults.size());
+    }
+
     /** Check query failure due to global memory quota exceeded. */
     @Test
     public void testGlobalQuota() throws Exception {
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/LocalQueryMemoryTrackerWithQueryParallelismSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/LocalQueryMemoryTrackerWithQueryParallelismSelfTest.java
index acb1fc0..4569792 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/LocalQueryMemoryTrackerWithQueryParallelismSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/LocalQueryMemoryTrackerWithQueryParallelismSelfTest.java
@@ -339,4 +339,40 @@ public class 
LocalQueryMemoryTrackerWithQueryParallelismSelfTest extends Abstrac
         assertEquals(1, localResults.size());
         assertTrue(100 > localResults.get(0).getRowCount());
     }
+
+    /** Check simple query with DISTINCT constraint. */
+    @Test
+    @Override public void testQueryWithDistinctAndLowCardinality() throws 
Exception {
+        // Distinct on indexed column with small cardinality.
+        execQuery("select DISTINCT K.grp_indexed from K", false);
+
+        assertEquals(5, localResults.size());
+        assertEquals(100, localResults.get(4).getRowCount());
+    }
+
+    /** Check query failure with DISTINCT constraint. */
+    @Test
+    @Override public void testQueryWithDistinctAndHighCardinality() throws 
Exception {
+        // Distinct on indexed column with unique values.
+        checkQueryExpectOOM("select DISTINCT K.id from K", true);
+
+        assertEquals(5, localResults.size());
+        assertFalse(localResults.stream().limit(4).anyMatch(r -> 
r.memoryReserved() + 1000 > maxMem));
+        assertTrue(BIG_TABLE_SIZE > localResults.get(4).getRowCount());
+    }
+
+    /** {@inheritDoc} */
+    @Test
+    @Override public void testLazyQueryWithGroupByThenSort() {
+        maxMem = MB / 2;
+
+        // Query failed on map side due too many groups.
+        checkQueryExpectOOM("select K.indexed, sum(K.grp) as a from K " +
+            "GROUP BY K.indexed ORDER BY a DESC", true);
+
+        // Result on reduce side.
+        assertEquals(1, localResults.size());
+        assertEquals(0, localResults.get(0).memoryReserved());
+        assertEquals(0, localResults.get(0).getRowCount());
+    }
 }
\ No newline at end of file
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryMemoryTrackerSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryMemoryTrackerSelfTest.java
index 6c29013..f612df9 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryMemoryTrackerSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/oom/QueryMemoryTrackerSelfTest.java
@@ -176,6 +176,17 @@ public class QueryMemoryTrackerSelfTest extends 
AbstractQueryMemoryTrackerSelfTe
         assertTrue(BIG_TABLE_SIZE > localResults.get(0).getRowCount());
     }
 
+    /** Check simple query with DISTINCT constraint. */
+    @Test
+    @Override public void testQueryWithDistinctAndLowCardinality() throws 
Exception {
+        // Distinct on indexed column with small cardinality.
+        execQuery("select DISTINCT K.grp_indexed from K", false);
+
+        assertEquals(2, localResults.size());
+        assertEquals(100, localResults.get(0).getRowCount());
+        assertEquals(100, localResults.get(1).getRowCount());
+    }
+
     /** {@inheritDoc} */
     @Test
     @Override public void 
testLazyQueryWithGroupByIndexedColAndDistinctAggregates() {

Reply via email to